Forward transactions to the leader for next Nth slot (#4806)

* review comments
This commit is contained in:
Pankaj Garg 2019-06-24 15:56:50 -07:00 committed by GitHub
parent 29611fb61d
commit 3f8ff23125
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 39 additions and 17 deletions

View File

@ -22,8 +22,8 @@ use solana_runtime::locked_accounts_results::LockedAccountsResults;
use solana_sdk::poh_config::PohConfig;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::{
self, duration_as_us, DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES,
MAX_TRANSACTION_FORWARDING_DELAY,
self, duration_as_us, DEFAULT_NUM_TICKS_PER_SECOND, DEFAULT_TICKS_PER_SLOT,
MAX_RECENT_BLOCKHASHES, MAX_TRANSACTION_FORWARDING_DELAY,
};
use solana_sdk::transaction::{self, Transaction, TransactionError};
use std::net::UdpSocket;
@ -38,6 +38,9 @@ use sys_info;
type PacketsAndOffsets = (Packets, Vec<usize>);
pub type UnprocessedPackets = Vec<PacketsAndOffsets>;
/// Transaction forwarding
pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 4;
// number of threads is 1 until mt bank is ready
pub const NUM_THREADS: u32 = 10;
@ -249,17 +252,15 @@ impl BankingStage {
buffered_packets: &mut Vec<PacketsAndOffsets>,
enable_forwarding: bool,
) -> Result<()> {
let (decision, next_leader) = {
let decision = {
let poh = poh_recorder.lock().unwrap();
let next_leader = poh.next_slot_leader();
(
Self::consume_or_forward_packets(
next_leader,
poh.bank().is_some(),
poh.would_be_leader(DEFAULT_TICKS_PER_SLOT * 2),
my_pubkey,
Self::consume_or_forward_packets(
poh.next_slot_leader(),
poh.bank().is_some(),
poh.would_be_leader(
(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET - 1) * DEFAULT_TICKS_PER_SLOT,
),
next_leader,
my_pubkey,
)
};
@ -272,6 +273,9 @@ impl BankingStage {
}
BufferedPacketsDecision::Forward => {
if enable_forwarding {
let poh = poh_recorder.lock().unwrap();
let next_leader =
poh.leader_after_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET);
next_leader.map_or(Ok(()), |leader_pubkey| {
let leader_addr = {
cluster_info
@ -608,10 +612,20 @@ impl BankingStage {
let filter = Self::prepare_filter_for_pending_transactions(transactions, pending_indexes);
let mut error_counters = ErrorCounters::default();
// The following code also checks if the blockhash for a transaction is too old
// The check accounts for
// 1. Transaction forwarding delay
// 2. The slot at which the next leader will actually process the transaction
// Drop the transaction if it will expire by the time the next node receives and processes it
let result = bank.check_transactions(
transactions,
&filter,
(MAX_RECENT_BLOCKHASHES - MAX_TRANSACTION_FORWARDING_DELAY) / 2,
(MAX_RECENT_BLOCKHASHES / 2)
.saturating_sub(MAX_TRANSACTION_FORWARDING_DELAY)
.saturating_sub(
(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET * bank.ticks_per_slot()
/ DEFAULT_NUM_TICKS_PER_SECOND) as usize,
),
&mut error_counters,
);

View File

@ -1,5 +1,6 @@
//! The `fetch_stage` batches input from a UDP socket and sends it to a channel.
use crate::banking_stage::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET;
use crate::poh_recorder::PohRecorder;
use crate::result::{Error, Result};
use crate::service::Service;
@ -61,11 +62,11 @@ impl FetchStage {
batch.push(more);
}
if poh_recorder
.lock()
.unwrap()
.would_be_leader(DEFAULT_TICKS_PER_SLOT * 2)
{
if poh_recorder.lock().unwrap().would_be_leader(
FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET
.saturating_add(1)
.saturating_mul(DEFAULT_TICKS_PER_SLOT),
) {
inc_new_counter_debug!("fetch_stage-honor_forwards", len);
for packets in batch {
if sendr.send(packets).is_err() {

View File

@ -105,6 +105,13 @@ impl PohRecorder {
self.leader_schedule_cache.slot_leader_at(slot + 1, None)
}
pub fn leader_after_slots(&self, slots: u64) -> Option<Pubkey> {
let slot =
leader_schedule_utils::tick_height_to_slot(self.ticks_per_slot, self.tick_height());
self.leader_schedule_cache
.slot_leader_at(slot + slots, None)
}
pub fn start_slot(&self) -> u64 {
self.start_slot
}