Check for transaction forwarding delay to detect an expired transaction before forwarding it (#4249)

Also refactored code for forwarding packets, and added test for it
This commit is contained in:
Pankaj Garg 2019-05-10 14:28:38 -07:00 committed by GitHub
parent 06a93dcb43
commit 133be2df51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 61 additions and 10 deletions

View File

@ -20,7 +20,10 @@ use solana_runtime::accounts_db::ErrorCounters;
use solana_runtime::bank::Bank;
use solana_runtime::locked_accounts_results::LockedAccountsResults;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::{self, duration_as_us, DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES};
use solana_sdk::timing::{
self, duration_as_us, DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES,
MAX_TRANSACTION_FORWARDING_DELAY,
};
use solana_sdk::transaction::{self, Transaction, TransactionError};
use std::cmp;
use std::net::UdpSocket;
@ -114,17 +117,19 @@ impl BankingStage {
Self { bank_thread_hdls }
}
fn filter_valid_packets_for_forwarding(all_packets: &[(Packets, Vec<usize>)]) -> Vec<&Packet> {
all_packets
.iter()
.flat_map(|(p, valid_indexes)| valid_indexes.iter().map(move |x| &p.packets[*x]))
.collect()
}
fn forward_buffered_packets(
socket: &std::net::UdpSocket,
tpu_via_blobs: &std::net::SocketAddr,
unprocessed_packets: &[(Packets, Vec<usize>)],
) -> std::io::Result<()> {
let packets: Vec<&Packet> = unprocessed_packets
.iter()
.flat_map(|(p, unprocessed_indexes)| {
unprocessed_indexes.iter().map(move |x| &p.packets[*x])
})
.collect();
let packets = Self::filter_valid_packets_for_forwarding(unprocessed_packets);
inc_new_counter_info!("banking_stage-forwarded_packets", packets.len());
let blobs = packet::packets_to_blobs(&packets);
@ -577,7 +582,7 @@ impl BankingStage {
let result = bank.check_transactions(
&transactions,
&filter,
MAX_RECENT_BLOCKHASHES / 2,
(MAX_RECENT_BLOCKHASHES - MAX_TRANSACTION_FORWARDING_DELAY) / 2,
&mut error_counters,
);
@ -621,13 +626,17 @@ impl BankingStage {
.filter_map(|(index, ver)| if *ver != 0 { Some(index) } else { None })
.collect();
if bank_shutdown {
unprocessed_packets.push((msgs, packet_indexes));
if !packet_indexes.is_empty() {
unprocessed_packets.push((msgs, packet_indexes));
}
continue;
}
let bank = poh.lock().unwrap().bank();
if bank.is_none() {
unprocessed_packets.push((msgs, packet_indexes));
if !packet_indexes.is_empty() {
unprocessed_packets.push((msgs, packet_indexes));
}
continue;
}
let bank = bank.unwrap();
@ -718,6 +727,7 @@ mod tests {
use crate::packet::to_packets;
use crate::poh_recorder::WorkingBank;
use crate::{get_tmp_ledger_path, tmp_ledger_name};
use itertools::Itertools;
use solana_sdk::instruction::InstructionError;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction;
@ -1402,4 +1412,41 @@ mod tests {
}
Blocktree::destroy(&ledger_path).unwrap();
}
#[test]
fn test_filter_valid_packets() {
solana_logger::setup();
let all_packets = (0..16)
.map(|packets_id| {
let packets = Packets::new(
(0..32)
.map(|packet_id| {
let mut p = Packet::default();
p.meta.port = packets_id << 8 | packet_id;
p
})
.collect_vec(),
);
let valid_indexes = (0..32)
.filter_map(|x| if x % 2 != 0 { Some(x as usize) } else { None })
.collect_vec();
(packets, valid_indexes)
})
.collect_vec();
let result = BankingStage::filter_valid_packets_for_forwarding(&all_packets);
assert_eq!(result.len(), 256);
let _ = result
.into_iter()
.enumerate()
.map(|(index, p)| {
let packets_id = index / 16;
let packet_id = (index % 16) * 2 + 1;
assert_eq!(p.meta.port, (packets_id << 8 | packet_id) as u16);
})
.collect_vec();
}
}

View File

@ -24,6 +24,10 @@ pub const MAX_HASH_AGE_IN_SECONDS: usize = 120;
// This must be <= MAX_HASH_AGE_IN_SECONDS, otherwise there's risk for DuplicateSignature errors
pub const MAX_RECENT_BLOCKHASHES: usize = MAX_HASH_AGE_IN_SECONDS;
/// This is maximum time consumed in forwarding a transaction from one node to next, before
/// it can be processed in the target node
pub const MAX_TRANSACTION_FORWARDING_DELAY: usize = 3;
pub fn duration_as_ns(d: &Duration) -> u64 {
d.as_secs() * 1_000_000_000 + u64::from(d.subsec_nanos())
}