Send votes from banking stage to vote listener (#11434)

*  Send votes from banking stage to vote listener

Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
carllin 2020-08-07 11:21:35 -07:00 committed by GitHub
parent b7c2681903
commit 7e25130529
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 268 additions and 156 deletions

1
Cargo.lock generated
View File

@ -4134,6 +4134,7 @@ dependencies = [
"bv", "bv",
"byteorder", "byteorder",
"bzip2", "bzip2",
"crossbeam-channel",
"dir-diff", "dir-diff",
"flate2", "flate2",
"fnv", "fnv",

View File

@ -167,6 +167,7 @@ fn main() {
let (verified_sender, verified_receiver) = unbounded(); let (verified_sender, verified_receiver) = unbounded();
let (vote_sender, vote_receiver) = unbounded(); let (vote_sender, vote_receiver) = unbounded();
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let bank0 = Bank::new(&genesis_config); let bank0 = Bank::new(&genesis_config);
let mut bank_forks = BankForks::new(bank0); let mut bank_forks = BankForks::new(bank0);
let mut bank = bank_forks.working_bank(); let mut bank = bank_forks.working_bank();
@ -224,6 +225,7 @@ fn main() {
verified_receiver, verified_receiver,
vote_receiver, vote_receiver,
None, None,
replay_vote_sender,
); );
poh_recorder.lock().unwrap().set_bank(&bank); poh_recorder.lock().unwrap().set_bank(&bank);

View File

@ -73,6 +73,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
let batch_len = batch.packets.len(); let batch_len = batch.packets.len();
packets.push((batch, vec![0usize; batch_len])); packets.push((batch, vec![0usize; batch_len]));
} }
let (s, _r) = unbounded();
// This tests the performance of buffering packets. // This tests the performance of buffering packets.
// If the packet buffers are copied, performance will be poor. // If the packet buffers are copied, performance will be poor.
bencher.iter(move || { bencher.iter(move || {
@ -82,6 +83,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
&mut packets, &mut packets,
10_000, 10_000,
None, None,
&s,
); );
}); });
@ -190,12 +192,14 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
create_test_recorder(&bank, &blockstore, None); create_test_recorder(&bank, &blockstore, None);
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(cluster_info); let cluster_info = Arc::new(cluster_info);
let (s, _r) = unbounded();
let _banking_stage = BankingStage::new( let _banking_stage = BankingStage::new(
&cluster_info, &cluster_info,
&poh_recorder, &poh_recorder,
verified_receiver, verified_receiver,
vote_receiver, vote_receiver,
None, None,
s,
); );
poh_recorder.lock().unwrap().set_bank(&bank); poh_recorder.lock().unwrap().set_bank(&bank);

View File

@ -24,7 +24,9 @@ use solana_perf::{
use solana_runtime::{ use solana_runtime::{
accounts_db::ErrorCounters, accounts_db::ErrorCounters,
bank::{Bank, TransactionBalancesSet, TransactionProcessResult}, bank::{Bank, TransactionBalancesSet, TransactionProcessResult},
bank_utils,
transaction_batch::TransactionBatch, transaction_batch::TransactionBatch,
vote_sender_types::ReplayVoteSender,
}; };
use solana_sdk::{ use solana_sdk::{
clock::{ clock::{
@ -81,6 +83,7 @@ impl BankingStage {
verified_receiver: CrossbeamReceiver<Vec<Packets>>, verified_receiver: CrossbeamReceiver<Vec<Packets>>,
verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>, verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
) -> Self { ) -> Self {
Self::new_num_threads( Self::new_num_threads(
cluster_info, cluster_info,
@ -89,6 +92,7 @@ impl BankingStage {
verified_vote_receiver, verified_vote_receiver,
Self::num_threads(), Self::num_threads(),
transaction_status_sender, transaction_status_sender,
gossip_vote_sender,
) )
} }
@ -99,6 +103,7 @@ impl BankingStage {
verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>, verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
num_threads: u32, num_threads: u32,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
) -> Self { ) -> Self {
let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BATCH); let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BATCH);
// Single thread to generate entries from many banks. // Single thread to generate entries from many banks.
@ -119,6 +124,7 @@ impl BankingStage {
let cluster_info = cluster_info.clone(); let cluster_info = cluster_info.clone();
let mut recv_start = Instant::now(); let mut recv_start = Instant::now();
let transaction_status_sender = transaction_status_sender.clone(); let transaction_status_sender = transaction_status_sender.clone();
let gossip_vote_sender = gossip_vote_sender.clone();
Builder::new() Builder::new()
.name("solana-banking-stage-tx".to_string()) .name("solana-banking-stage-tx".to_string())
.spawn(move || { .spawn(move || {
@ -132,7 +138,8 @@ impl BankingStage {
enable_forwarding, enable_forwarding,
i, i,
batch_limit, batch_limit,
transaction_status_sender.clone(), transaction_status_sender,
gossip_vote_sender,
); );
}) })
.unwrap() .unwrap()
@ -168,6 +175,7 @@ impl BankingStage {
buffered_packets: &mut Vec<PacketsAndOffsets>, buffered_packets: &mut Vec<PacketsAndOffsets>,
batch_limit: usize, batch_limit: usize,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
) -> UnprocessedPackets { ) -> UnprocessedPackets {
let mut unprocessed_packets = vec![]; let mut unprocessed_packets = vec![];
let mut rebuffered_packets = 0; let mut rebuffered_packets = 0;
@ -199,6 +207,7 @@ impl BankingStage {
&msgs, &msgs,
unprocessed_indexes.to_owned(), unprocessed_indexes.to_owned(),
transaction_status_sender.clone(), transaction_status_sender.clone(),
gossip_vote_sender,
); );
new_tx_count += processed; new_tx_count += processed;
@ -283,6 +292,7 @@ impl BankingStage {
) )
} }
#[allow(clippy::too_many_arguments)]
fn process_buffered_packets( fn process_buffered_packets(
my_pubkey: &Pubkey, my_pubkey: &Pubkey,
socket: &std::net::UdpSocket, socket: &std::net::UdpSocket,
@ -292,6 +302,7 @@ impl BankingStage {
enable_forwarding: bool, enable_forwarding: bool,
batch_limit: usize, batch_limit: usize,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
) -> BufferedPacketsDecision { ) -> BufferedPacketsDecision {
let (leader_at_slot_offset, poh_has_bank, would_be_leader) = { let (leader_at_slot_offset, poh_has_bank, would_be_leader) = {
let poh = poh_recorder.lock().unwrap(); let poh = poh_recorder.lock().unwrap();
@ -319,6 +330,7 @@ impl BankingStage {
buffered_packets, buffered_packets,
batch_limit, batch_limit,
transaction_status_sender, transaction_status_sender,
gossip_vote_sender,
); );
buffered_packets.append(&mut unprocessed); buffered_packets.append(&mut unprocessed);
} }
@ -352,6 +364,7 @@ impl BankingStage {
decision decision
} }
#[allow(clippy::too_many_arguments)]
pub fn process_loop( pub fn process_loop(
my_pubkey: Pubkey, my_pubkey: Pubkey,
verified_receiver: &CrossbeamReceiver<Vec<Packets>>, verified_receiver: &CrossbeamReceiver<Vec<Packets>>,
@ -362,6 +375,7 @@ impl BankingStage {
id: u32, id: u32,
batch_limit: usize, batch_limit: usize,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
) { ) {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut buffered_packets = vec![]; let mut buffered_packets = vec![];
@ -376,6 +390,7 @@ impl BankingStage {
enable_forwarding, enable_forwarding,
batch_limit, batch_limit,
transaction_status_sender.clone(), transaction_status_sender.clone(),
&gossip_vote_sender,
); );
if decision == BufferedPacketsDecision::Hold { if decision == BufferedPacketsDecision::Hold {
// If we are waiting on a new bank, // If we are waiting on a new bank,
@ -403,6 +418,7 @@ impl BankingStage {
id, id,
batch_limit, batch_limit,
transaction_status_sender.clone(), transaction_status_sender.clone(),
&gossip_vote_sender,
) { ) {
Err(RecvTimeoutError::Timeout) => (), Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => break, Err(RecvTimeoutError::Disconnected) => break,
@ -501,6 +517,7 @@ impl BankingStage {
poh: &Arc<Mutex<PohRecorder>>, poh: &Arc<Mutex<PohRecorder>>,
batch: &TransactionBatch, batch: &TransactionBatch,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
) -> (Result<usize, PohRecorderError>, Vec<usize>) { ) -> (Result<usize, PohRecorderError>, Vec<usize>) {
let mut load_execute_time = Measure::start("load_execute_time"); let mut load_execute_time = Measure::start("load_execute_time");
// Use a shorter maximum age when adding transactions into the pipeline. This will reduce // Use a shorter maximum age when adding transactions into the pipeline. This will reduce
@ -533,24 +550,23 @@ impl BankingStage {
let num_to_commit = num_to_commit.unwrap(); let num_to_commit = num_to_commit.unwrap();
if num_to_commit != 0 { if num_to_commit != 0 {
let transaction_statuses = bank let tx_results = bank.commit_transactions(
.commit_transactions( txs,
txs, None,
None, &mut loaded_accounts,
&mut loaded_accounts, &results,
&results, tx_count,
tx_count, signature_count,
signature_count, );
)
.processing_results;
bank_utils::find_and_send_votes(txs, &tx_results, Some(gossip_vote_sender));
if let Some(sender) = transaction_status_sender { if let Some(sender) = transaction_status_sender {
let post_balances = bank.collect_balances(batch); let post_balances = bank.collect_balances(batch);
send_transaction_status_batch( send_transaction_status_batch(
bank.clone(), bank.clone(),
batch.transactions(), batch.transactions(),
batch.iteration_order_vec(), batch.iteration_order_vec(),
transaction_statuses, tx_results.processing_results,
TransactionBalancesSet::new(pre_balances, post_balances), TransactionBalancesSet::new(pre_balances, post_balances),
sender, sender,
); );
@ -578,6 +594,7 @@ impl BankingStage {
poh: &Arc<Mutex<PohRecorder>>, poh: &Arc<Mutex<PohRecorder>>,
chunk_offset: usize, chunk_offset: usize,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
) -> (Result<usize, PohRecorderError>, Vec<usize>) { ) -> (Result<usize, PohRecorderError>, Vec<usize>) {
let mut lock_time = Measure::start("lock_time"); let mut lock_time = Measure::start("lock_time");
// Once accounts are locked, other threads cannot encode transactions that will modify the // Once accounts are locked, other threads cannot encode transactions that will modify the
@ -590,6 +607,7 @@ impl BankingStage {
poh, poh,
&batch, &batch,
transaction_status_sender, transaction_status_sender,
gossip_vote_sender,
); );
retryable_txs.iter_mut().for_each(|x| *x += chunk_offset); retryable_txs.iter_mut().for_each(|x| *x += chunk_offset);
@ -618,6 +636,7 @@ impl BankingStage {
transactions: &[Transaction], transactions: &[Transaction],
poh: &Arc<Mutex<PohRecorder>>, poh: &Arc<Mutex<PohRecorder>>,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
) -> (usize, Vec<usize>) { ) -> (usize, Vec<usize>) {
let mut chunk_start = 0; let mut chunk_start = 0;
let mut unprocessed_txs = vec![]; let mut unprocessed_txs = vec![];
@ -633,6 +652,7 @@ impl BankingStage {
poh, poh,
chunk_start, chunk_start,
transaction_status_sender.clone(), transaction_status_sender.clone(),
gossip_vote_sender,
); );
trace!("process_transactions result: {:?}", result); trace!("process_transactions result: {:?}", result);
@ -764,6 +784,7 @@ impl BankingStage {
msgs: &Packets, msgs: &Packets,
packet_indexes: Vec<usize>, packet_indexes: Vec<usize>,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
) -> (usize, usize, Vec<usize>) { ) -> (usize, usize, Vec<usize>) {
let (transactions, transaction_to_packet_indexes) = let (transactions, transaction_to_packet_indexes) =
Self::transactions_from_packets(msgs, &packet_indexes); Self::transactions_from_packets(msgs, &packet_indexes);
@ -775,8 +796,13 @@ impl BankingStage {
let tx_len = transactions.len(); let tx_len = transactions.len();
let (processed, unprocessed_tx_indexes) = let (processed, unprocessed_tx_indexes) = Self::process_transactions(
Self::process_transactions(bank, &transactions, poh, transaction_status_sender); bank,
&transactions,
poh,
transaction_status_sender,
gossip_vote_sender,
);
let unprocessed_tx_count = unprocessed_tx_indexes.len(); let unprocessed_tx_count = unprocessed_tx_indexes.len();
@ -846,6 +872,7 @@ impl BankingStage {
.collect() .collect()
} }
#[allow(clippy::too_many_arguments)]
/// Process the incoming packets /// Process the incoming packets
pub fn process_packets( pub fn process_packets(
my_pubkey: &Pubkey, my_pubkey: &Pubkey,
@ -856,6 +883,7 @@ impl BankingStage {
id: u32, id: u32,
batch_limit: usize, batch_limit: usize,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
) -> Result<UnprocessedPackets, RecvTimeoutError> { ) -> Result<UnprocessedPackets, RecvTimeoutError> {
let mut recv_time = Measure::start("process_packets_recv"); let mut recv_time = Measure::start("process_packets_recv");
let mms = verified_receiver.recv_timeout(recv_timeout)?; let mms = verified_receiver.recv_timeout(recv_timeout)?;
@ -898,6 +926,7 @@ impl BankingStage {
&msgs, &msgs,
packet_indexes, packet_indexes,
transaction_status_sender.clone(), transaction_status_sender.clone(),
gossip_vote_sender,
); );
new_tx_count += processed; new_tx_count += processed;
@ -1045,6 +1074,7 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_config)); let bank = Arc::new(Bank::new(&genesis_config));
let (verified_sender, verified_receiver) = unbounded(); let (verified_sender, verified_receiver) = unbounded();
let (vote_sender, vote_receiver) = unbounded(); let (vote_sender, vote_receiver) = unbounded();
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
{ {
let blockstore = Arc::new( let blockstore = Arc::new(
@ -1061,6 +1091,7 @@ mod tests {
verified_receiver, verified_receiver,
vote_receiver, vote_receiver,
None, None,
gossip_vote_sender,
); );
drop(verified_sender); drop(verified_sender);
drop(vote_sender); drop(vote_sender);
@ -1095,12 +1126,15 @@ mod tests {
create_test_recorder(&bank, &blockstore, Some(poh_config)); create_test_recorder(&bank, &blockstore, Some(poh_config));
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(cluster_info); let cluster_info = Arc::new(cluster_info);
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
let banking_stage = BankingStage::new( let banking_stage = BankingStage::new(
&cluster_info, &cluster_info,
&poh_recorder, &poh_recorder,
verified_receiver, verified_receiver,
vote_receiver, vote_receiver,
None, None,
gossip_vote_sender,
); );
trace!("sending bank"); trace!("sending bank");
drop(verified_sender); drop(verified_sender);
@ -1158,12 +1192,15 @@ mod tests {
create_test_recorder(&bank, &blockstore, Some(poh_config)); create_test_recorder(&bank, &blockstore, Some(poh_config));
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(cluster_info); let cluster_info = Arc::new(cluster_info);
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
let banking_stage = BankingStage::new( let banking_stage = BankingStage::new(
&cluster_info, &cluster_info,
&poh_recorder, &poh_recorder,
verified_receiver, verified_receiver,
vote_receiver, vote_receiver,
None, None,
gossip_vote_sender,
); );
// fund another account so we can send 2 good transactions in a single batch. // fund another account so we can send 2 good transactions in a single batch.
@ -1284,6 +1321,8 @@ mod tests {
let (vote_sender, vote_receiver) = unbounded(); let (vote_sender, vote_receiver) = unbounded();
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
{ {
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
let entry_receiver = { let entry_receiver = {
// start a banking_stage to eat verified receiver // start a banking_stage to eat verified receiver
let bank = Arc::new(Bank::new(&genesis_config)); let bank = Arc::new(Bank::new(&genesis_config));
@ -1306,6 +1345,7 @@ mod tests {
vote_receiver, vote_receiver,
2, 2,
None, None,
gossip_vote_sender,
); );
// wait for banking_stage to eat the packets // wait for banking_stage to eat the packets
@ -1683,6 +1723,7 @@ mod tests {
let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let poh_recorder = Arc::new(Mutex::new(poh_recorder));
poh_recorder.lock().unwrap().set_working_bank(working_bank); poh_recorder.lock().unwrap().set_working_bank(working_bank);
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
BankingStage::process_and_record_transactions( BankingStage::process_and_record_transactions(
&bank, &bank,
@ -1690,6 +1731,7 @@ mod tests {
&poh_recorder, &poh_recorder,
0, 0,
None, None,
&gossip_vote_sender,
) )
.0 .0
.unwrap(); .unwrap();
@ -1726,6 +1768,7 @@ mod tests {
&poh_recorder, &poh_recorder,
0, 0,
None, None,
&gossip_vote_sender,
) )
.0, .0,
Err(PohRecorderError::MaxHeightReached) Err(PohRecorderError::MaxHeightReached)
@ -1777,12 +1820,15 @@ mod tests {
poh_recorder.lock().unwrap().set_working_bank(working_bank); poh_recorder.lock().unwrap().set_working_bank(working_bank);
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
let (result, unprocessed) = BankingStage::process_and_record_transactions( let (result, unprocessed) = BankingStage::process_and_record_transactions(
&bank, &bank,
&transactions, &transactions,
&poh_recorder, &poh_recorder,
0, 0,
None, None,
&gossip_vote_sender,
); );
assert!(result.is_ok()); assert!(result.is_ok());
@ -1866,8 +1912,16 @@ mod tests {
// record // record
let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
let (processed_transactions_count, mut retryable_txs) = let (processed_transactions_count, mut retryable_txs) =
BankingStage::process_transactions(&bank, &transactions, &poh_recorder, None); BankingStage::process_transactions(
&bank,
&transactions,
&poh_recorder,
None,
&gossip_vote_sender,
);
assert_eq!(processed_transactions_count, 0,); assert_eq!(processed_transactions_count, 0,);
@ -1944,12 +1998,15 @@ mod tests {
&Arc::new(AtomicBool::new(false)), &Arc::new(AtomicBool::new(false)),
); );
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
let _ = BankingStage::process_and_record_transactions( let _ = BankingStage::process_and_record_transactions(
&bank, &bank,
&transactions, &transactions,
&poh_recorder, &poh_recorder,
0, 0,
Some(transaction_status_sender), Some(transaction_status_sender),
&gossip_vote_sender,
); );
transaction_status_service.join().unwrap(); transaction_status_service.join().unwrap();

View File

@ -15,10 +15,7 @@ use crossbeam_channel::{
}; };
use itertools::izip; use itertools::izip;
use log::*; use log::*;
use solana_ledger::{ use solana_ledger::blockstore::Blockstore;
blockstore::Blockstore,
blockstore_processor::{ReplayVotesReceiver, ReplayedVote},
};
use solana_metrics::inc_new_counter_debug; use solana_metrics::inc_new_counter_debug;
use solana_perf::packet::{self, Packets}; use solana_perf::packet::{self, Packets};
use solana_runtime::{ use solana_runtime::{
@ -26,6 +23,7 @@ use solana_runtime::{
bank_forks::BankForks, bank_forks::BankForks,
epoch_stakes::{EpochAuthorizedVoters, EpochStakes}, epoch_stakes::{EpochAuthorizedVoters, EpochStakes},
stakes::Stakes, stakes::Stakes,
vote_sender_types::{ReplayVoteReceiver, ReplayedVote},
}; };
use solana_sdk::{ use solana_sdk::{
clock::{Epoch, Slot}, clock::{Epoch, Slot},
@ -248,7 +246,7 @@ impl ClusterInfoVoteListener {
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
subscriptions: Arc<RpcSubscriptions>, subscriptions: Arc<RpcSubscriptions>,
verified_vote_sender: VerifiedVoteSender, verified_vote_sender: VerifiedVoteSender,
replay_votes_receiver: ReplayVotesReceiver, replay_votes_receiver: ReplayVoteReceiver,
blockstore: Arc<Blockstore>, blockstore: Arc<Blockstore>,
) -> Self { ) -> Self {
let exit_ = exit.clone(); let exit_ = exit.clone();
@ -420,7 +418,7 @@ impl ClusterInfoVoteListener {
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
subscriptions: Arc<RpcSubscriptions>, subscriptions: Arc<RpcSubscriptions>,
verified_vote_sender: VerifiedVoteSender, verified_vote_sender: VerifiedVoteSender,
replay_votes_receiver: ReplayVotesReceiver, replay_votes_receiver: ReplayVoteReceiver,
blockstore: Arc<Blockstore>, blockstore: Arc<Blockstore>,
) -> Result<()> { ) -> Result<()> {
let mut optimistic_confirmation_verifier = let mut optimistic_confirmation_verifier =
@ -478,7 +476,7 @@ impl ClusterInfoVoteListener {
root_bank: &Bank, root_bank: &Bank,
subscriptions: &RpcSubscriptions, subscriptions: &RpcSubscriptions,
verified_vote_sender: &VerifiedVoteSender, verified_vote_sender: &VerifiedVoteSender,
replay_votes_receiver: &ReplayVotesReceiver, replay_votes_receiver: &ReplayVoteReceiver,
) -> Result<Vec<(Slot, Hash)>> { ) -> Result<Vec<(Slot, Hash)>> {
Self::get_and_process_votes( Self::get_and_process_votes(
gossip_vote_txs_receiver, gossip_vote_txs_receiver,
@ -496,7 +494,7 @@ impl ClusterInfoVoteListener {
root_bank: &Bank, root_bank: &Bank,
subscriptions: &RpcSubscriptions, subscriptions: &RpcSubscriptions,
verified_vote_sender: &VerifiedVoteSender, verified_vote_sender: &VerifiedVoteSender,
replay_votes_receiver: &ReplayVotesReceiver, replay_votes_receiver: &ReplayVoteReceiver,
) -> Result<Vec<(Slot, Hash)>> { ) -> Result<Vec<(Slot, Hash)>> {
let mut sel = Select::new(); let mut sel = Select::new();
sel.recv(gossip_vote_txs_receiver); sel.recv(gossip_vote_txs_receiver);
@ -772,12 +770,12 @@ impl ClusterInfoVoteListener {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use solana_ledger::blockstore_processor::ReplayVotesSender;
use solana_perf::packet; use solana_perf::packet;
use solana_runtime::{ use solana_runtime::{
bank::Bank, bank::Bank,
commitment::BlockCommitmentCache, commitment::BlockCommitmentCache,
genesis_utils::{self, GenesisConfigInfo, ValidatorVoteKeypairs}, genesis_utils::{self, GenesisConfigInfo, ValidatorVoteKeypairs},
vote_sender_types::ReplayVoteSender,
}; };
use solana_sdk::{ use solana_sdk::{
hash::Hash, hash::Hash,
@ -1040,7 +1038,7 @@ mod tests {
validator_voting_keypairs: &[ValidatorVoteKeypairs], validator_voting_keypairs: &[ValidatorVoteKeypairs],
switch_proof_hash: Option<Hash>, switch_proof_hash: Option<Hash>,
votes_sender: &VerifiedVoteTransactionsSender, votes_sender: &VerifiedVoteTransactionsSender,
replay_votes_sender: &ReplayVotesSender, replay_votes_sender: &ReplayVoteSender,
) { ) {
validator_voting_keypairs.iter().for_each(|keypairs| { validator_voting_keypairs.iter().for_each(|keypairs| {
let node_keypair = &keypairs.node_keypair; let node_keypair = &keypairs.node_keypair;

View File

@ -21,9 +21,7 @@ use crate::{
use solana_ledger::{ use solana_ledger::{
block_error::BlockError, block_error::BlockError,
blockstore::Blockstore, blockstore::Blockstore,
blockstore_processor::{ blockstore_processor::{self, BlockstoreProcessorError, TransactionStatusSender},
self, BlockstoreProcessorError, ReplayVotesSender, TransactionStatusSender,
},
entry::VerifyRecyclers, entry::VerifyRecyclers,
leader_schedule_cache::LeaderScheduleCache, leader_schedule_cache::LeaderScheduleCache,
}; };
@ -31,7 +29,7 @@ use solana_measure::{measure::Measure, thread_mem_usage};
use solana_metrics::inc_new_counter_info; use solana_metrics::inc_new_counter_info;
use solana_runtime::{ use solana_runtime::{
bank::Bank, bank_forks::BankForks, commitment::BlockCommitmentCache, bank::Bank, bank_forks::BankForks, commitment::BlockCommitmentCache,
snapshot_package::AccountsPackageSender, snapshot_package::AccountsPackageSender, vote_sender_types::ReplayVoteSender,
}; };
use solana_sdk::{ use solana_sdk::{
clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS}, clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS},
@ -223,7 +221,7 @@ impl ReplayStage {
cluster_slots: Arc<ClusterSlots>, cluster_slots: Arc<ClusterSlots>,
retransmit_slots_sender: RetransmitSlotsSender, retransmit_slots_sender: RetransmitSlotsSender,
duplicate_slots_reset_receiver: DuplicateSlotsResetReceiver, duplicate_slots_reset_receiver: DuplicateSlotsResetReceiver,
replay_votes_sender: ReplayVotesSender, replay_vote_sender: ReplayVoteSender,
) -> Self { ) -> Self {
let ReplayStageConfig { let ReplayStageConfig {
my_pubkey, my_pubkey,
@ -344,7 +342,7 @@ impl ReplayStage {
&verify_recyclers, &verify_recyclers,
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&subscriptions, &subscriptions,
&replay_votes_sender, &replay_vote_sender,
); );
replay_active_banks_time.stop(); replay_active_banks_time.stop();
Self::report_memory(&allocated, "replay_active_banks", start); Self::report_memory(&allocated, "replay_active_banks", start);
@ -940,7 +938,7 @@ impl ReplayStage {
blockstore: &Blockstore, blockstore: &Blockstore,
bank_progress: &mut ForkProgress, bank_progress: &mut ForkProgress,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
replay_votes_sender: &ReplayVotesSender, replay_vote_sender: &ReplayVoteSender,
verify_recyclers: &VerifyRecyclers, verify_recyclers: &VerifyRecyclers,
) -> result::Result<usize, BlockstoreProcessorError> { ) -> result::Result<usize, BlockstoreProcessorError> {
let tx_count_before = bank_progress.replay_progress.num_txs; let tx_count_before = bank_progress.replay_progress.num_txs;
@ -951,7 +949,7 @@ impl ReplayStage {
&mut bank_progress.replay_progress, &mut bank_progress.replay_progress,
false, false,
transaction_status_sender, transaction_status_sender,
Some(replay_votes_sender), Some(replay_vote_sender),
None, None,
verify_recyclers, verify_recyclers,
); );
@ -1214,7 +1212,7 @@ impl ReplayStage {
verify_recyclers: &VerifyRecyclers, verify_recyclers: &VerifyRecyclers,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
subscriptions: &Arc<RpcSubscriptions>, subscriptions: &Arc<RpcSubscriptions>,
replay_votes_sender: &ReplayVotesSender, replay_vote_sender: &ReplayVoteSender,
) -> bool { ) -> bool {
let mut did_complete_bank = false; let mut did_complete_bank = false;
let mut tx_count = 0; let mut tx_count = 0;
@ -1260,7 +1258,7 @@ impl ReplayStage {
&blockstore, &blockstore,
bank_progress, bank_progress,
transaction_status_sender.clone(), transaction_status_sender.clone(),
replay_votes_sender, replay_vote_sender,
verify_recyclers, verify_recyclers,
); );
match replay_result { match replay_result {
@ -2417,7 +2415,7 @@ pub(crate) mod tests {
F: Fn(&Keypair, Arc<Bank>) -> Vec<Shred>, F: Fn(&Keypair, Arc<Bank>) -> Vec<Shred>,
{ {
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
let (replay_votes_sender, _replay_votes_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let res = { let res = {
let blockstore = Arc::new( let blockstore = Arc::new(
Blockstore::open(&ledger_path) Blockstore::open(&ledger_path)
@ -2442,8 +2440,8 @@ pub(crate) mod tests {
&blockstore, &blockstore,
&mut bank0_progress, &mut bank0_progress,
None, None,
&replay_votes_sender, &replay_vote_sender,
&VerifyRecyclers::default(), &&VerifyRecyclers::default(),
); );
// Check that the erroring bank was marked as dead in the progress map // Check that the erroring bank was marked as dead in the progress map
@ -2606,7 +2604,7 @@ pub(crate) mod tests {
blockstore.set_roots(&[slot]).unwrap(); blockstore.set_roots(&[slot]).unwrap();
let (transaction_status_sender, transaction_status_receiver) = unbounded(); let (transaction_status_sender, transaction_status_receiver) = unbounded();
let (replay_votes_sender, _replay_votes_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let transaction_status_service = TransactionStatusService::new( let transaction_status_service = TransactionStatusService::new(
transaction_status_receiver, transaction_status_receiver,
blockstore, blockstore,
@ -2620,7 +2618,7 @@ pub(crate) mod tests {
&entries, &entries,
true, true,
Some(transaction_status_sender), Some(transaction_status_sender),
Some(&replay_votes_sender), Some(&replay_vote_sender),
); );
transaction_status_service.join().unwrap(); transaction_status_service.join().unwrap();

View File

@ -13,11 +13,11 @@ use crate::{
sigverify_stage::SigVerifyStage, sigverify_stage::SigVerifyStage,
}; };
use crossbeam_channel::unbounded; use crossbeam_channel::unbounded;
use solana_ledger::{ use solana_ledger::{blockstore::Blockstore, blockstore_processor::TransactionStatusSender};
blockstore::Blockstore, use solana_runtime::{
blockstore_processor::{ReplayVotesReceiver, TransactionStatusSender}, bank_forks::BankForks,
vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender},
}; };
use solana_runtime::bank_forks::BankForks;
use std::{ use std::{
net::UdpSocket, net::UdpSocket,
sync::{ sync::{
@ -55,7 +55,8 @@ impl Tpu {
vote_tracker: Arc<VoteTracker>, vote_tracker: Arc<VoteTracker>,
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
verified_vote_sender: VerifiedVoteSender, verified_vote_sender: VerifiedVoteSender,
replay_votes_receiver: ReplayVotesReceiver, replay_vote_receiver: ReplayVoteReceiver,
replay_vote_sender: ReplayVoteSender,
) -> Self { ) -> Self {
let (packet_sender, packet_receiver) = channel(); let (packet_sender, packet_receiver) = channel();
let fetch_stage = FetchStage::new_with_sender( let fetch_stage = FetchStage::new_with_sender(
@ -82,7 +83,7 @@ impl Tpu {
bank_forks, bank_forks,
subscriptions.clone(), subscriptions.clone(),
verified_vote_sender, verified_vote_sender,
replay_votes_receiver, replay_vote_receiver,
blockstore.clone(), blockstore.clone(),
); );
@ -92,6 +93,7 @@ impl Tpu {
verified_receiver, verified_receiver,
verified_vote_packets_receiver, verified_vote_packets_receiver,
transaction_status_sender, transaction_status_sender,
replay_vote_sender,
); );
let broadcast_stage = broadcast_type.new_broadcast_stage( let broadcast_stage = broadcast_type.new_broadcast_stage(

View File

@ -57,7 +57,7 @@ impl TransactionStatusService {
} = write_transaction_status_receiver.recv_timeout(Duration::from_secs(1))?; } = write_transaction_status_receiver.recv_timeout(Duration::from_secs(1))?;
let slot = bank.slot(); let slot = bank.slot();
for (((transaction, (status, hash_age_kind)), pre_balances), post_balances) in for ((((_, transaction), (status, hash_age_kind)), pre_balances), post_balances) in
OrderedIterator::new(&transactions, iteration_order.as_deref()) OrderedIterator::new(&transactions, iteration_order.as_deref())
.zip(statuses) .zip(statuses)
.zip(balances.pre_balances) .zip(balances.pre_balances)

View File

@ -21,12 +21,12 @@ use crate::{
use crossbeam_channel::unbounded; use crossbeam_channel::unbounded;
use solana_ledger::{ use solana_ledger::{
blockstore::{Blockstore, CompletedSlotsReceiver}, blockstore::{Blockstore, CompletedSlotsReceiver},
blockstore_processor::{ReplayVotesSender, TransactionStatusSender}, blockstore_processor::TransactionStatusSender,
leader_schedule_cache::LeaderScheduleCache, leader_schedule_cache::LeaderScheduleCache,
}; };
use solana_runtime::{ use solana_runtime::{
bank_forks::BankForks, commitment::BlockCommitmentCache, bank_forks::BankForks, commitment::BlockCommitmentCache,
snapshot_package::AccountsPackageSender, snapshot_package::AccountsPackageSender, vote_sender_types::ReplayVoteSender,
}; };
use solana_sdk::{ use solana_sdk::{
pubkey::Pubkey, pubkey::Pubkey,
@ -98,7 +98,7 @@ impl Tvu {
vote_tracker: Arc<VoteTracker>, vote_tracker: Arc<VoteTracker>,
retransmit_slots_sender: RetransmitSlotsSender, retransmit_slots_sender: RetransmitSlotsSender,
verified_vote_receiver: VerifiedVoteReceiver, verified_vote_receiver: VerifiedVoteReceiver,
replay_votes_sender: ReplayVotesSender, replay_vote_sender: ReplayVoteSender,
tvu_config: TvuConfig, tvu_config: TvuConfig,
) -> Self { ) -> Self {
let keypair: Arc<Keypair> = cluster_info.keypair.clone(); let keypair: Arc<Keypair> = cluster_info.keypair.clone();
@ -199,7 +199,7 @@ impl Tvu {
cluster_slots, cluster_slots,
retransmit_slots_sender, retransmit_slots_sender,
duplicate_slots_reset_receiver, duplicate_slots_reset_receiver,
replay_votes_sender, replay_vote_sender,
); );
let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {
@ -285,7 +285,7 @@ pub mod tests {
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let (retransmit_slots_sender, _retransmit_slots_receiver) = unbounded(); let (retransmit_slots_sender, _retransmit_slots_receiver) = unbounded();
let (_verified_vote_sender, verified_vote_receiver) = unbounded(); let (_verified_vote_sender, verified_vote_receiver) = unbounded();
let (replay_votes_sender, _replay_votes_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let bank_forks = Arc::new(RwLock::new(bank_forks)); let bank_forks = Arc::new(RwLock::new(bank_forks));
let tvu = Tvu::new( let tvu = Tvu::new(
&vote_keypair.pubkey(), &vote_keypair.pubkey(),
@ -319,7 +319,7 @@ pub mod tests {
Arc::new(VoteTracker::new(&bank)), Arc::new(VoteTracker::new(&bank)),
retransmit_slots_sender, retransmit_slots_sender,
verified_vote_receiver, verified_vote_receiver,
replay_votes_sender, replay_vote_sender,
TvuConfig::default(), TvuConfig::default(),
); );
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);

View File

@ -28,7 +28,7 @@ use solana_ledger::{
bank_forks_utils, bank_forks_utils,
blockstore::{Blockstore, CompletedSlotsReceiver, PurgeType}, blockstore::{Blockstore, CompletedSlotsReceiver, PurgeType},
blockstore_db::BlockstoreRecoveryMode, blockstore_db::BlockstoreRecoveryMode,
blockstore_processor::{self, ReplayVotesSender, TransactionStatusSender}, blockstore_processor::{self, TransactionStatusSender},
create_new_tmp_ledger, create_new_tmp_ledger,
leader_schedule::FixedSchedule, leader_schedule::FixedSchedule,
leader_schedule_cache::LeaderScheduleCache, leader_schedule_cache::LeaderScheduleCache,
@ -224,7 +224,7 @@ impl Validator {
validator_exit.register_exit(Box::new(move || exit_.store(true, Ordering::Relaxed))); validator_exit.register_exit(Box::new(move || exit_.store(true, Ordering::Relaxed)));
let validator_exit = Arc::new(RwLock::new(Some(validator_exit))); let validator_exit = Arc::new(RwLock::new(Some(validator_exit)));
let (replay_votes_sender, replay_votes_receiver) = unbounded(); let (replay_vote_sender, replay_vote_receiver) = unbounded();
let ( let (
genesis_config, genesis_config,
bank_forks, bank_forks,
@ -239,7 +239,7 @@ impl Validator {
rewards_recorder_sender, rewards_recorder_sender,
rewards_recorder_service, rewards_recorder_service,
}, },
) = new_banks_from_ledger(config, ledger_path, poh_verify, &exit, &replay_votes_sender); ) = new_banks_from_ledger(config, ledger_path, poh_verify, &exit);
let leader_schedule_cache = Arc::new(leader_schedule_cache); let leader_schedule_cache = Arc::new(leader_schedule_cache);
let bank = bank_forks.working_bank(); let bank = bank_forks.working_bank();
@ -465,7 +465,7 @@ impl Validator {
vote_tracker.clone(), vote_tracker.clone(),
retransmit_slots_sender, retransmit_slots_sender,
verified_vote_receiver, verified_vote_receiver,
replay_votes_sender, replay_vote_sender.clone(),
TvuConfig { TvuConfig {
max_ledger_shreds: config.max_ledger_shreds, max_ledger_shreds: config.max_ledger_shreds,
halt_on_trusted_validators_accounts_hash_mismatch: config halt_on_trusted_validators_accounts_hash_mismatch: config
@ -493,7 +493,8 @@ impl Validator {
vote_tracker, vote_tracker,
bank_forks, bank_forks,
verified_vote_sender, verified_vote_sender,
replay_votes_receiver, replay_vote_receiver,
replay_vote_sender,
); );
datapoint_info!("validator-new", ("id", id.to_string(), String)); datapoint_info!("validator-new", ("id", id.to_string(), String));
@ -587,7 +588,6 @@ fn new_banks_from_ledger(
ledger_path: &Path, ledger_path: &Path,
poh_verify: bool, poh_verify: bool,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
replay_votes_sender: &ReplayVotesSender,
) -> ( ) -> (
GenesisConfig, GenesisConfig,
BankForks, BankForks,
@ -649,7 +649,6 @@ fn new_banks_from_ledger(
transaction_history_services transaction_history_services
.transaction_status_sender .transaction_status_sender
.clone(), .clone(),
Some(&replay_votes_sender),
) )
.unwrap_or_else(|err| { .unwrap_or_else(|err| {
error!("Failed to load ledger: {:?}", err); error!("Failed to load ledger: {:?}", err);

View File

@ -688,7 +688,6 @@ fn load_bank_forks(
snapshot_config.as_ref(), snapshot_config.as_ref(),
process_options, process_options,
None, None,
None,
) )
} }

View File

@ -2,7 +2,7 @@ use crate::{
blockstore::Blockstore, blockstore::Blockstore,
blockstore_processor::{ blockstore_processor::{
self, BlockstoreProcessorError, BlockstoreProcessorResult, ProcessOptions, self, BlockstoreProcessorError, BlockstoreProcessorResult, ProcessOptions,
ReplayVotesSender, TransactionStatusSender, TransactionStatusSender,
}, },
entry::VerifyRecyclers, entry::VerifyRecyclers,
leader_schedule_cache::LeaderScheduleCache, leader_schedule_cache::LeaderScheduleCache,
@ -36,7 +36,6 @@ pub fn load(
snapshot_config: Option<&SnapshotConfig>, snapshot_config: Option<&SnapshotConfig>,
process_options: ProcessOptions, process_options: ProcessOptions,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
replay_votes_sender: Option<&ReplayVotesSender>,
) -> LoadResult { ) -> LoadResult {
if let Some(snapshot_config) = snapshot_config.as_ref() { if let Some(snapshot_config) = snapshot_config.as_ref() {
info!( info!(
@ -90,7 +89,6 @@ pub fn load(
&process_options, &process_options,
&VerifyRecyclers::default(), &VerifyRecyclers::default(),
transaction_status_sender, transaction_status_sender,
replay_votes_sender,
), ),
Some(deserialized_snapshot_hash), Some(deserialized_snapshot_hash),
); );

View File

@ -6,7 +6,7 @@ use crate::{
entry::{create_ticks, Entry, EntrySlice, EntryVerificationStatus, VerifyRecyclers}, entry::{create_ticks, Entry, EntrySlice, EntryVerificationStatus, VerifyRecyclers},
leader_schedule_cache::LeaderScheduleCache, leader_schedule_cache::LeaderScheduleCache,
}; };
use crossbeam_channel::{Receiver, Sender}; use crossbeam_channel::Sender;
use itertools::Itertools; use itertools::Itertools;
use log::*; use log::*;
use rand::{seq::SliceRandom, thread_rng}; use rand::{seq::SliceRandom, thread_rng};
@ -17,8 +17,10 @@ use solana_rayon_threadlimit::get_thread_count;
use solana_runtime::{ use solana_runtime::{
bank::{Bank, TransactionBalancesSet, TransactionProcessResult, TransactionResults}, bank::{Bank, TransactionBalancesSet, TransactionProcessResult, TransactionResults},
bank_forks::BankForks, bank_forks::BankForks,
bank_utils,
transaction_batch::TransactionBatch, transaction_batch::TransactionBatch,
transaction_utils::OrderedIterator, transaction_utils::OrderedIterator,
vote_sender_types::ReplayVoteSender,
}; };
use solana_sdk::{ use solana_sdk::{
clock::{Slot, MAX_PROCESSING_AGE}, clock::{Slot, MAX_PROCESSING_AGE},
@ -29,7 +31,6 @@ use solana_sdk::{
timing::duration_as_ms, timing::duration_as_ms,
transaction::{Result, Transaction, TransactionError}, transaction::{Result, Transaction, TransactionError},
}; };
use solana_vote_program::{vote_state::Vote, vote_transaction};
use std::{ use std::{
cell::RefCell, cell::RefCell,
collections::HashMap, collections::HashMap,
@ -43,10 +44,6 @@ use thiserror::Error;
pub type BlockstoreProcessorResult = pub type BlockstoreProcessorResult =
result::Result<(BankForks, LeaderScheduleCache), BlockstoreProcessorError>; result::Result<(BankForks, LeaderScheduleCache), BlockstoreProcessorError>;
pub type ReplayedVote = (Pubkey, Vote, Option<Hash>);
pub type ReplayVotesSender = Sender<ReplayedVote>;
pub type ReplayVotesReceiver = Receiver<ReplayedVote>;
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new() thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count()) .num_threads(get_thread_count())
.thread_name(|ix| format!("blockstore_processor_{}", ix)) .thread_name(|ix| format!("blockstore_processor_{}", ix))
@ -69,7 +66,7 @@ fn get_first_error(
fee_collection_results: Vec<Result<()>>, fee_collection_results: Vec<Result<()>>,
) -> Option<(Result<()>, Signature)> { ) -> Option<(Result<()>, Signature)> {
let mut first_err = None; let mut first_err = None;
for (result, transaction) in fee_collection_results.iter().zip(OrderedIterator::new( for (result, (_, transaction)) in fee_collection_results.iter().zip(OrderedIterator::new(
batch.transactions(), batch.transactions(),
batch.iteration_order(), batch.iteration_order(),
)) { )) {
@ -98,32 +95,21 @@ fn execute_batch(
batch: &TransactionBatch, batch: &TransactionBatch,
bank: &Arc<Bank>, bank: &Arc<Bank>,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
replay_votes_sender: Option<&ReplayVotesSender>, replay_vote_sender: Option<&ReplayVoteSender>,
) -> Result<()> { ) -> Result<()> {
let ( let (tx_results, balances) = batch.bank().load_execute_and_commit_transactions(
TransactionResults {
fee_collection_results,
processing_results,
},
balances,
) = batch.bank().load_execute_and_commit_transactions(
batch, batch,
MAX_PROCESSING_AGE, MAX_PROCESSING_AGE,
transaction_status_sender.is_some(), transaction_status_sender.is_some(),
); );
if let Some(replay_votes_sender) = replay_votes_sender { bank_utils::find_and_send_votes(batch.transactions(), &tx_results, replay_vote_sender);
for (transaction, (processing_result, _)) in
OrderedIterator::new(batch.transactions(), batch.iteration_order()) let TransactionResults {
.zip(&processing_results) fee_collection_results,
{ processing_results,
if processing_result.is_ok() { ..
if let Some(parsed_vote) = vote_transaction::parse_vote_transaction(transaction) { } = tx_results;
let _ = replay_votes_sender.send(parsed_vote);
}
}
}
}
if let Some(sender) = transaction_status_sender { if let Some(sender) = transaction_status_sender {
send_transaction_status_batch( send_transaction_status_batch(
@ -145,7 +131,7 @@ fn execute_batches(
batches: &[TransactionBatch], batches: &[TransactionBatch],
entry_callback: Option<&ProcessCallback>, entry_callback: Option<&ProcessCallback>,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
replay_votes_sender: Option<&ReplayVotesSender>, replay_vote_sender: Option<&ReplayVoteSender>,
) -> Result<()> { ) -> Result<()> {
inc_new_counter_debug!("bank-par_execute_entries-count", batches.len()); inc_new_counter_debug!("bank-par_execute_entries-count", batches.len());
let results: Vec<Result<()>> = PAR_THREAD_POOL.with(|thread_pool| { let results: Vec<Result<()>> = PAR_THREAD_POOL.with(|thread_pool| {
@ -153,7 +139,7 @@ fn execute_batches(
batches batches
.into_par_iter() .into_par_iter()
.map_with(transaction_status_sender, |sender, batch| { .map_with(transaction_status_sender, |sender, batch| {
let result = execute_batch(batch, bank, sender.clone(), replay_votes_sender); let result = execute_batch(batch, bank, sender.clone(), replay_vote_sender);
if let Some(entry_callback) = entry_callback { if let Some(entry_callback) = entry_callback {
entry_callback(bank); entry_callback(bank);
} }
@ -176,7 +162,7 @@ pub fn process_entries(
entries: &[Entry], entries: &[Entry],
randomize: bool, randomize: bool,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
replay_votes_sender: Option<&ReplayVotesSender>, replay_vote_sender: Option<&ReplayVoteSender>,
) -> Result<()> { ) -> Result<()> {
process_entries_with_callback( process_entries_with_callback(
bank, bank,
@ -184,7 +170,7 @@ pub fn process_entries(
randomize, randomize,
None, None,
transaction_status_sender, transaction_status_sender,
replay_votes_sender, replay_vote_sender,
) )
} }
@ -194,7 +180,7 @@ fn process_entries_with_callback(
randomize: bool, randomize: bool,
entry_callback: Option<&ProcessCallback>, entry_callback: Option<&ProcessCallback>,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
replay_votes_sender: Option<&ReplayVotesSender>, replay_vote_sender: Option<&ReplayVoteSender>,
) -> Result<()> { ) -> Result<()> {
// accumulator for entries that can be processed in parallel // accumulator for entries that can be processed in parallel
let mut batches = vec![]; let mut batches = vec![];
@ -211,7 +197,7 @@ fn process_entries_with_callback(
&batches, &batches,
entry_callback, entry_callback,
transaction_status_sender.clone(), transaction_status_sender.clone(),
replay_votes_sender, replay_vote_sender,
)?; )?;
batches.clear(); batches.clear();
for hash in &tick_hashes { for hash in &tick_hashes {
@ -267,7 +253,7 @@ fn process_entries_with_callback(
&batches, &batches,
entry_callback, entry_callback,
transaction_status_sender.clone(), transaction_status_sender.clone(),
replay_votes_sender, replay_vote_sender,
)?; )?;
batches.clear(); batches.clear();
} }
@ -278,7 +264,7 @@ fn process_entries_with_callback(
&batches, &batches,
entry_callback, entry_callback,
transaction_status_sender, transaction_status_sender,
replay_votes_sender, replay_vote_sender,
)?; )?;
for hash in tick_hashes { for hash in tick_hashes {
bank.register_tick(&hash); bank.register_tick(&hash);
@ -345,15 +331,7 @@ pub fn process_blockstore(
info!("processing ledger for slot 0..."); info!("processing ledger for slot 0...");
let recyclers = VerifyRecyclers::default(); let recyclers = VerifyRecyclers::default();
process_bank_0(&bank0, blockstore, &opts, &recyclers)?; process_bank_0(&bank0, blockstore, &opts, &recyclers)?;
process_blockstore_from_root( process_blockstore_from_root(genesis_config, blockstore, bank0, &opts, &recyclers, None)
genesis_config,
blockstore,
bank0,
&opts,
&recyclers,
None,
None,
)
} }
// Process blockstore from a known root bank // Process blockstore from a known root bank
@ -364,7 +342,6 @@ pub fn process_blockstore_from_root(
opts: &ProcessOptions, opts: &ProcessOptions,
recyclers: &VerifyRecyclers, recyclers: &VerifyRecyclers,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
replay_votes_sender: Option<&ReplayVotesSender>,
) -> BlockstoreProcessorResult { ) -> BlockstoreProcessorResult {
info!("processing ledger from slot {}...", bank.slot()); info!("processing ledger from slot {}...", bank.slot());
let allocated = thread_mem_usage::Allocatedp::default(); let allocated = thread_mem_usage::Allocatedp::default();
@ -430,7 +407,6 @@ pub fn process_blockstore_from_root(
opts, opts,
recyclers, recyclers,
transaction_status_sender, transaction_status_sender,
replay_votes_sender,
)?; )?;
(initial_forks, leader_schedule_cache) (initial_forks, leader_schedule_cache)
} else { } else {
@ -520,7 +496,7 @@ fn confirm_full_slot(
recyclers: &VerifyRecyclers, recyclers: &VerifyRecyclers,
progress: &mut ConfirmationProgress, progress: &mut ConfirmationProgress,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
replay_votes_sender: Option<&ReplayVotesSender>, replay_vote_sender: Option<&ReplayVoteSender>,
) -> result::Result<(), BlockstoreProcessorError> { ) -> result::Result<(), BlockstoreProcessorError> {
let mut timing = ConfirmationTiming::default(); let mut timing = ConfirmationTiming::default();
let skip_verification = !opts.poh_verify; let skip_verification = !opts.poh_verify;
@ -531,7 +507,7 @@ fn confirm_full_slot(
progress, progress,
skip_verification, skip_verification,
transaction_status_sender, transaction_status_sender,
replay_votes_sender, replay_vote_sender,
opts.entry_callback.as_ref(), opts.entry_callback.as_ref(),
recyclers, recyclers,
)?; )?;
@ -592,7 +568,7 @@ pub fn confirm_slot(
progress: &mut ConfirmationProgress, progress: &mut ConfirmationProgress,
skip_verification: bool, skip_verification: bool,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
replay_votes_sender: Option<&ReplayVotesSender>, replay_vote_sender: Option<&ReplayVoteSender>,
entry_callback: Option<&ProcessCallback>, entry_callback: Option<&ProcessCallback>,
recyclers: &VerifyRecyclers, recyclers: &VerifyRecyclers,
) -> result::Result<(), BlockstoreProcessorError> { ) -> result::Result<(), BlockstoreProcessorError> {
@ -660,7 +636,7 @@ pub fn confirm_slot(
true, true,
entry_callback, entry_callback,
transaction_status_sender, transaction_status_sender,
replay_votes_sender, replay_vote_sender,
) )
.map_err(BlockstoreProcessorError::from); .map_err(BlockstoreProcessorError::from);
replay_elapsed.stop(); replay_elapsed.stop();
@ -779,7 +755,6 @@ fn load_frozen_forks(
opts: &ProcessOptions, opts: &ProcessOptions,
recyclers: &VerifyRecyclers, recyclers: &VerifyRecyclers,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
replay_votes_sender: Option<&ReplayVotesSender>,
) -> result::Result<Vec<Arc<Bank>>, BlockstoreProcessorError> { ) -> result::Result<Vec<Arc<Bank>>, BlockstoreProcessorError> {
let mut initial_forks = HashMap::new(); let mut initial_forks = HashMap::new();
let mut last_status_report = Instant::now(); let mut last_status_report = Instant::now();
@ -819,6 +794,7 @@ fn load_frozen_forks(
let initial_allocation = allocated.get(); let initial_allocation = allocated.get();
let mut progress = ConfirmationProgress::new(last_entry_hash); let mut progress = ConfirmationProgress::new(last_entry_hash);
if process_single_slot( if process_single_slot(
blockstore, blockstore,
&bank, &bank,
@ -826,7 +802,7 @@ fn load_frozen_forks(
recyclers, recyclers,
&mut progress, &mut progress,
transaction_status_sender.clone(), transaction_status_sender.clone(),
replay_votes_sender, None,
) )
.is_err() .is_err()
{ {
@ -877,11 +853,11 @@ fn process_single_slot(
recyclers: &VerifyRecyclers, recyclers: &VerifyRecyclers,
progress: &mut ConfirmationProgress, progress: &mut ConfirmationProgress,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
replay_votes_sender: Option<&ReplayVotesSender>, replay_vote_sender: Option<&ReplayVoteSender>,
) -> result::Result<(), BlockstoreProcessorError> { ) -> result::Result<(), BlockstoreProcessorError> {
// Mark corrupt slots as dead so validators don't replay this slot and // Mark corrupt slots as dead so validators don't replay this slot and
// see DuplicateSignature errors later in ReplayStage // see DuplicateSignature errors later in ReplayStage
confirm_full_slot(blockstore, bank, opts, recyclers, progress, transaction_status_sender, replay_votes_sender).map_err(|err| { confirm_full_slot(blockstore, bank, opts, recyclers, progress, transaction_status_sender, replay_vote_sender).map_err(|err| {
let slot = bank.slot(); let slot = bank.slot();
warn!("slot {} failed to verify: {}", slot, err); warn!("slot {} failed to verify: {}", slot, err);
if blockstore.is_primary_access() { if blockstore.is_primary_access() {
@ -987,6 +963,7 @@ pub mod tests {
system_transaction, system_transaction,
transaction::{Transaction, TransactionError}, transaction::{Transaction, TransactionError},
}; };
use solana_vote_program::vote_transaction;
use std::{collections::BTreeSet, sync::RwLock}; use std::{collections::BTreeSet, sync::RwLock};
#[test] #[test]
@ -2532,7 +2509,6 @@ pub mod tests {
&opts, &opts,
&recyclers, &recyclers,
None, None,
None,
) )
.unwrap(); .unwrap();
@ -2760,7 +2736,7 @@ pub mod tests {
let ( let (
TransactionResults { TransactionResults {
fee_collection_results, fee_collection_results,
processing_results: _, ..
}, },
_balances, _balances,
) = batch ) = batch
@ -2840,9 +2816,9 @@ pub mod tests {
}) })
.collect(); .collect();
let entry = next_entry(&bank_1_blockhash, 1, vote_txs); let entry = next_entry(&bank_1_blockhash, 1, vote_txs);
let (replay_votes_sender, replay_votes_receiver) = unbounded(); let (replay_vote_sender, replay_vote_receiver) = unbounded();
let _ = process_entries(&bank1, &[entry], true, None, Some(&replay_votes_sender)); let _ = process_entries(&bank1, &[entry], true, None, Some(&replay_vote_sender));
let successes: BTreeSet<Pubkey> = replay_votes_receiver let successes: BTreeSet<Pubkey> = replay_vote_receiver
.try_iter() .try_iter()
.map(|(vote_pubkey, _, _)| vote_pubkey) .map(|(vote_pubkey, _, _)| vote_pubkey)
.collect(); .collect();

View File

@ -250,6 +250,16 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "crossbeam-channel"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09ee0cc8804d5393478d743b035099520087a5186f3b93fa58cec08fa62407b6"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]] [[package]]
name = "crossbeam-deque" name = "crossbeam-deque"
version = "0.7.3" version = "0.7.3"
@ -1840,6 +1850,7 @@ dependencies = [
"bv", "bv",
"byteorder 1.3.4", "byteorder 1.3.4",
"bzip2", "bzip2",
"crossbeam-channel",
"dir-diff", "dir-diff",
"flate2", "flate2",
"fnv", "fnv",

View File

@ -13,6 +13,7 @@ bincode = "1.3.1"
bv = { version = "0.11.1", features = ["serde"] } bv = { version = "0.11.1", features = ["serde"] }
byteorder = "1.3.4" byteorder = "1.3.4"
bzip2 = "0.3.3" bzip2 = "0.3.3"
crossbeam-channel = "0.4"
dir-diff = "0.3.2" dir-diff = "0.3.2"
flate2 = "1.0.14" flate2 = "1.0.14"
fnv = "1.0.7" fnv = "1.0.7"

View File

@ -12,7 +12,7 @@ fn bench_ordered_iterator_with_order_shuffling(bencher: &mut Bencher) {
bencher.iter(|| { bencher.iter(|| {
let mut order: Vec<usize> = (0..100_usize).collect(); let mut order: Vec<usize> = (0..100_usize).collect();
order.shuffle(&mut thread_rng()); order.shuffle(&mut thread_rng());
let _ordered_iterator_resp: Vec<&usize> = let _ordered_iterator_resp: Vec<(usize, &usize)> =
OrderedIterator::new(&vec, Some(&order)).collect(); OrderedIterator::new(&vec, Some(&order)).collect();
}); });
} }

View File

@ -281,7 +281,7 @@ impl Accounts {
OrderedIterator::new(txs, txs_iteration_order) OrderedIterator::new(txs, txs_iteration_order)
.zip(lock_results.into_iter()) .zip(lock_results.into_iter())
.map(|etx| match etx { .map(|etx| match etx {
(tx, (Ok(()), hash_age_kind)) => { ((_, tx), (Ok(()), hash_age_kind)) => {
let fee_calculator = match hash_age_kind.as_ref() { let fee_calculator = match hash_age_kind.as_ref() {
Some(HashAgeKind::DurableNonce(_, account)) => { Some(HashAgeKind::DurableNonce(_, account)) => {
nonce_utils::fee_calculator_of(account) nonce_utils::fee_calculator_of(account)
@ -612,7 +612,7 @@ impl Accounts {
) -> Vec<Result<()>> { ) -> Vec<Result<()>> {
use solana_sdk::sanitize::Sanitize; use solana_sdk::sanitize::Sanitize;
let keys: Vec<Result<_>> = OrderedIterator::new(txs, txs_iteration_order) let keys: Vec<Result<_>> = OrderedIterator::new(txs, txs_iteration_order)
.map(|tx| { .map(|(_, tx)| {
tx.sanitize().map_err(TransactionError::from)?; tx.sanitize().map_err(TransactionError::from)?;
if Self::has_duplicates(&tx.message.account_keys) { if Self::has_duplicates(&tx.message.account_keys) {
@ -645,7 +645,7 @@ impl Accounts {
OrderedIterator::new(txs, txs_iteration_order) OrderedIterator::new(txs, txs_iteration_order)
.zip(results.iter()) .zip(results.iter())
.for_each(|(tx, result)| self.unlock_account(tx, result, &mut account_locks)); .for_each(|((_, tx), result)| self.unlock_account(tx, result, &mut account_locks));
} }
/// Store the accounts into the DB /// Store the accounts into the DB
@ -697,7 +697,7 @@ impl Accounts {
fix_recent_blockhashes_sysvar_delay: bool, fix_recent_blockhashes_sysvar_delay: bool,
) -> Vec<(&'a Pubkey, &'a Account)> { ) -> Vec<(&'a Pubkey, &'a Account)> {
let mut accounts = Vec::with_capacity(loaded.len()); let mut accounts = Vec::with_capacity(loaded.len());
for (i, ((raccs, _hash_age_kind), tx)) in loaded for (i, ((raccs, _hash_age_kind), (_, tx))) in loaded
.iter_mut() .iter_mut()
.zip(OrderedIterator::new(txs, txs_iteration_order)) .zip(OrderedIterator::new(txs, txs_iteration_order))
.enumerate() .enumerate()

View File

@ -175,11 +175,18 @@ pub type TransactionProcessResult = (Result<()>, Option<HashAgeKind>);
pub struct TransactionResults { pub struct TransactionResults {
pub fee_collection_results: Vec<Result<()>>, pub fee_collection_results: Vec<Result<()>>,
pub processing_results: Vec<TransactionProcessResult>, pub processing_results: Vec<TransactionProcessResult>,
pub overwritten_vote_accounts: Vec<OverwrittenVoteAccount>,
} }
pub struct TransactionBalancesSet { pub struct TransactionBalancesSet {
pub pre_balances: TransactionBalances, pub pre_balances: TransactionBalances,
pub post_balances: TransactionBalances, pub post_balances: TransactionBalances,
} }
pub struct OverwrittenVoteAccount {
pub account: Account,
pub transaction_index: usize,
pub transaction_result_index: usize,
}
impl TransactionBalancesSet { impl TransactionBalancesSet {
pub fn new(pre_balances: TransactionBalances, post_balances: TransactionBalances) -> Self { pub fn new(pre_balances: TransactionBalances, post_balances: TransactionBalances) -> Self {
assert_eq!(pre_balances.len(), post_balances.len()); assert_eq!(pre_balances.len(), post_balances.len());
@ -1276,7 +1283,7 @@ impl Bank {
res: &[TransactionProcessResult], res: &[TransactionProcessResult],
) { ) {
let mut status_cache = self.src.status_cache.write().unwrap(); let mut status_cache = self.src.status_cache.write().unwrap();
for (i, tx) in OrderedIterator::new(txs, iteration_order).enumerate() { for (i, (_, tx)) in OrderedIterator::new(txs, iteration_order).enumerate() {
let (res, _hash_age_kind) = &res[i]; let (res, _hash_age_kind) = &res[i];
if Self::can_commit(res) && !tx.signatures.is_empty() { if Self::can_commit(res) && !tx.signatures.is_empty() {
status_cache.insert( status_cache.insert(
@ -1422,7 +1429,7 @@ impl Bank {
let hash_queue = self.blockhash_queue.read().unwrap(); let hash_queue = self.blockhash_queue.read().unwrap();
OrderedIterator::new(txs, iteration_order) OrderedIterator::new(txs, iteration_order)
.zip(lock_results.into_iter()) .zip(lock_results.into_iter())
.map(|(tx, lock_res)| match lock_res { .map(|((_, tx), lock_res)| match lock_res {
Ok(()) => { Ok(()) => {
let message = tx.message(); let message = tx.message();
let hash_age = hash_queue.check_hash_age(&message.recent_blockhash, max_age); let hash_age = hash_queue.check_hash_age(&message.recent_blockhash, max_age);
@ -1452,7 +1459,7 @@ impl Bank {
let rcache = self.src.status_cache.read().unwrap(); let rcache = self.src.status_cache.read().unwrap();
OrderedIterator::new(txs, iteration_order) OrderedIterator::new(txs, iteration_order)
.zip(lock_results.into_iter()) .zip(lock_results.into_iter())
.map(|(tx, lock_res)| { .map(|((_, tx), lock_res)| {
if tx.signatures.is_empty() { if tx.signatures.is_empty() {
return lock_res; return lock_res;
} }
@ -1487,7 +1494,7 @@ impl Bank {
) -> Vec<TransactionProcessResult> { ) -> Vec<TransactionProcessResult> {
OrderedIterator::new(txs, iteration_order) OrderedIterator::new(txs, iteration_order)
.zip(lock_results.into_iter()) .zip(lock_results.into_iter())
.map(|(tx, lock_res)| { .map(|((_, tx), lock_res)| {
if lock_res.0.is_ok() { if lock_res.0.is_ok() {
if tx.message.instructions.len() == 1 { if tx.message.instructions.len() == 1 {
let instruction = &tx.message.instructions[0]; let instruction = &tx.message.instructions[0];
@ -1579,7 +1586,8 @@ impl Bank {
pub fn collect_balances(&self, batch: &TransactionBatch) -> TransactionBalances { pub fn collect_balances(&self, batch: &TransactionBatch) -> TransactionBalances {
let mut balances: TransactionBalances = vec![]; let mut balances: TransactionBalances = vec![];
for transaction in OrderedIterator::new(batch.transactions(), batch.iteration_order()) { for (_, transaction) in OrderedIterator::new(batch.transactions(), batch.iteration_order())
{
let mut transaction_balances: Vec<u64> = vec![]; let mut transaction_balances: Vec<u64> = vec![];
for account_key in transaction.message.account_keys.iter() { for account_key in transaction.message.account_keys.iter() {
transaction_balances.push(self.get_balance(account_key)); transaction_balances.push(self.get_balance(account_key));
@ -1728,7 +1736,7 @@ impl Bank {
let retryable_txs: Vec<_> = let retryable_txs: Vec<_> =
OrderedIterator::new(batch.lock_results(), batch.iteration_order()) OrderedIterator::new(batch.lock_results(), batch.iteration_order())
.enumerate() .enumerate()
.filter_map(|(index, res)| match res { .filter_map(|(index, (_, res))| match res {
Err(TransactionError::AccountInUse) => { Err(TransactionError::AccountInUse) => {
error_counters.account_in_use += 1; error_counters.account_in_use += 1;
Some(index) Some(index)
@ -1758,7 +1766,7 @@ impl Bank {
let executed: Vec<TransactionProcessResult> = loaded_accounts let executed: Vec<TransactionProcessResult> = loaded_accounts
.iter_mut() .iter_mut()
.zip(OrderedIterator::new(txs, batch.iteration_order())) .zip(OrderedIterator::new(txs, batch.iteration_order()))
.map(|(accs, tx)| match accs { .map(|(accs, (_, tx))| match accs {
(Err(e), hash_age_kind) => (Err(e.clone()), hash_age_kind.clone()), (Err(e), hash_age_kind) => (Err(e.clone()), hash_age_kind.clone()),
(Ok((accounts, loaders, _rents)), hash_age_kind) => { (Ok((accounts, loaders, _rents)), hash_age_kind) => {
signature_count += u64::from(tx.message().header.num_required_signatures); signature_count += u64::from(tx.message().header.num_required_signatures);
@ -1837,7 +1845,7 @@ impl Bank {
let mut fees = 0; let mut fees = 0;
let results = OrderedIterator::new(txs, iteration_order) let results = OrderedIterator::new(txs, iteration_order)
.zip(executed.iter()) .zip(executed.iter())
.map(|(tx, (res, hash_age_kind))| { .map(|((_, tx), (res, hash_age_kind))| {
let (fee_calculator, is_durable_nonce) = match hash_age_kind { let (fee_calculator, is_durable_nonce) = match hash_age_kind {
Some(HashAgeKind::DurableNonce(_, account)) => { Some(HashAgeKind::DurableNonce(_, account)) => {
(nonce_utils::fee_calculator_of(account), true) (nonce_utils::fee_calculator_of(account), true)
@ -1921,7 +1929,8 @@ impl Bank {
); );
self.collect_rent(executed, loaded_accounts); self.collect_rent(executed, loaded_accounts);
self.update_cached_accounts(txs, iteration_order, executed, loaded_accounts); let overwritten_vote_accounts =
self.update_cached_accounts(txs, iteration_order, executed, loaded_accounts);
// once committed there is no way to unroll // once committed there is no way to unroll
write_time.stop(); write_time.stop();
@ -1929,9 +1938,11 @@ impl Bank {
self.update_transaction_statuses(txs, iteration_order, &executed); self.update_transaction_statuses(txs, iteration_order, &executed);
let fee_collection_results = let fee_collection_results =
self.filter_program_errors_and_collect_fee(txs, iteration_order, executed); self.filter_program_errors_and_collect_fee(txs, iteration_order, executed);
TransactionResults { TransactionResults {
fee_collection_results, fee_collection_results,
processing_results: executed.to_vec(), processing_results: executed.to_vec(),
overwritten_vote_accounts,
} }
} }
@ -2866,8 +2877,9 @@ impl Bank {
iteration_order: Option<&[usize]>, iteration_order: Option<&[usize]>,
res: &[TransactionProcessResult], res: &[TransactionProcessResult],
loaded: &[(Result<TransactionLoadResult>, Option<HashAgeKind>)], loaded: &[(Result<TransactionLoadResult>, Option<HashAgeKind>)],
) { ) -> Vec<OverwrittenVoteAccount> {
for (i, ((raccs, _load_hash_age_kind), tx)) in loaded let mut overwritten_vote_accounts = vec![];
for (i, ((raccs, _load_hash_age_kind), (transaction_index, tx))) in loaded
.iter() .iter()
.zip(OrderedIterator::new(txs, iteration_order)) .zip(OrderedIterator::new(txs, iteration_order))
.enumerate() .enumerate()
@ -2887,10 +2899,20 @@ impl Bank {
.filter(|(_key, account)| (Stakes::is_stake(account))) .filter(|(_key, account)| (Stakes::is_stake(account)))
{ {
if Stakes::is_stake(account) { if Stakes::is_stake(account) {
self.stakes.write().unwrap().store(pubkey, account); if let Some(old_vote_account) =
self.stakes.write().unwrap().store(pubkey, account)
{
overwritten_vote_accounts.push(OverwrittenVoteAccount {
account: old_vote_account,
transaction_index,
transaction_result_index: i,
});
}
} }
} }
} }
overwritten_vote_accounts
} }
/// current stake delegations for this bank /// current stake delegations for this bank

View File

@ -1,8 +1,10 @@
use crate::{ use crate::{
bank::Bank, bank::{Bank, TransactionResults},
genesis_utils::{self, GenesisConfigInfo, ValidatorVoteKeypairs}, genesis_utils::{self, GenesisConfigInfo, ValidatorVoteKeypairs},
vote_sender_types::ReplayVoteSender,
}; };
use solana_sdk::{pubkey::Pubkey, signature::Signer}; use solana_sdk::{pubkey::Pubkey, signature::Signer, transaction::Transaction};
use solana_vote_program::vote_transaction;
pub fn setup_bank_and_vote_pubkeys(num_vote_accounts: usize, stake: u64) -> (Bank, Vec<Pubkey>) { pub fn setup_bank_and_vote_pubkeys(num_vote_accounts: usize, stake: u64) -> (Bank, Vec<Pubkey>) {
// Create some voters at genesis // Create some voters at genesis
@ -23,3 +25,28 @@ pub fn setup_bank_and_vote_pubkeys(num_vote_accounts: usize, stake: u64) -> (Ban
let bank = Bank::new(&genesis_config); let bank = Bank::new(&genesis_config);
(bank, vote_pubkeys) (bank, vote_pubkeys)
} }
pub fn find_and_send_votes(
txs: &[Transaction],
tx_results: &TransactionResults,
vote_sender: Option<&ReplayVoteSender>,
) {
let TransactionResults {
processing_results,
overwritten_vote_accounts,
..
} = tx_results;
if let Some(vote_sender) = vote_sender {
for old_account in overwritten_vote_accounts {
assert!(processing_results[old_account.transaction_result_index]
.0
.is_ok());
let transaction = &txs[old_account.transaction_index];
if let Some(parsed_vote) = vote_transaction::parse_vote_transaction(transaction) {
if parsed_vote.1.slots.last().is_some() {
let _ = vote_sender.send(parsed_vote);
}
}
}
}
}

View File

@ -29,6 +29,7 @@ pub mod status_cache;
mod system_instruction_processor; mod system_instruction_processor;
pub mod transaction_batch; pub mod transaction_batch;
pub mod transaction_utils; pub mod transaction_utils;
pub mod vote_sender_types;
extern crate solana_config_program; extern crate solana_config_program;
extern crate solana_stake_program; extern crate solana_stake_program;

View File

@ -105,20 +105,18 @@ impl Stakes {
&& account.data.len() >= std::mem::size_of::<StakeState>() && account.data.len() >= std::mem::size_of::<StakeState>()
} }
pub fn store(&mut self, pubkey: &Pubkey, account: &Account) { pub fn store(&mut self, pubkey: &Pubkey, account: &Account) -> Option<Account> {
if solana_vote_program::check_id(&account.owner) { if solana_vote_program::check_id(&account.owner) {
if account.lamports == 0 { let old = self.vote_accounts.remove(pubkey);
self.vote_accounts.remove(pubkey); if account.lamports != 0 {
} else { let stake = old.as_ref().map_or_else(
let old = self.vote_accounts.get(pubkey);
let stake = old.map_or_else(
|| self.calculate_stake(pubkey, self.epoch, Some(&self.stake_history)), || self.calculate_stake(pubkey, self.epoch, Some(&self.stake_history)),
|v| v.0, |v| v.0,
); );
self.vote_accounts.insert(*pubkey, (stake, account.clone())); self.vote_accounts.insert(*pubkey, (stake, account.clone()));
} }
old.map(|(_, account)| account)
} else if solana_stake_program::check_id(&account.owner) { } else if solana_stake_program::check_id(&account.owner) {
// old_stake is stake lamports and voter_pubkey from the pre-store() version // old_stake is stake lamports and voter_pubkey from the pre-store() version
let old_stake = self.stake_delegations.get(pubkey).map(|delegation| { let old_stake = self.stake_delegations.get(pubkey).map(|delegation| {
@ -160,6 +158,9 @@ impl Stakes {
} else if let Some(delegation) = delegation { } else if let Some(delegation) = delegation {
self.stake_delegations.insert(*pubkey, delegation); self.stake_delegations.insert(*pubkey, delegation);
} }
None
} else {
None
} }
} }

View File

@ -21,7 +21,7 @@ impl<'a, T> OrderedIterator<'a, T> {
} }
impl<'a, T> Iterator for OrderedIterator<'a, T> { impl<'a, T> Iterator for OrderedIterator<'a, T> {
type Item = &'a T; type Item = (usize, &'a T);
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
if self.current >= self.vec.len() { if self.current >= self.vec.len() {
None None
@ -33,7 +33,7 @@ impl<'a, T> Iterator for OrderedIterator<'a, T> {
index = self.current; index = self.current;
} }
self.current += 1; self.current += 1;
Some(self.vec.index(index)) Some((index, self.vec.index(index)))
} }
} }
} }
@ -42,17 +42,22 @@ impl<'a, T> Iterator for OrderedIterator<'a, T> {
mod tests { mod tests {
use super::*; use super::*;
type IteratorResponse<'a> = Vec<(((usize, &'a usize), &'a usize), usize)>;
#[test] #[test]
fn test_ordered_iterator_custom_order() { fn test_ordered_iterator_custom_order() {
let vec: Vec<usize> = vec![1, 2, 3, 4]; let vec: Vec<usize> = vec![1, 2, 3, 4];
let custom_order: Vec<usize> = vec![3, 1, 0, 2]; let custom_order: Vec<usize> = vec![3, 1, 0, 2];
let custom_order_ = custom_order.clone();
let ordered_iterator = OrderedIterator::new(&vec, Some(&custom_order)); let ordered_iterator = OrderedIterator::new(&vec, Some(&custom_order));
let expected_response: Vec<usize> = vec![4, 2, 1, 3]; let expected_response: Vec<usize> = vec![4, 2, 1, 3];
let resp: Vec<(&usize, &usize)> = ordered_iterator let resp: IteratorResponse = ordered_iterator
.zip(expected_response.iter()) .zip(expected_response.iter())
.filter(|(actual_elem, expected_elem)| *actual_elem == *expected_elem) .zip(custom_order_)
.filter(|(((index, actual_elem), expected_elem), expected_index)| {
*actual_elem == *expected_elem && index == expected_index
})
.collect(); .collect();
assert_eq!(resp.len(), custom_order.len()); assert_eq!(resp.len(), custom_order.len());
@ -63,9 +68,12 @@ mod tests {
let vec: Vec<usize> = vec![1, 2, 3, 4]; let vec: Vec<usize> = vec![1, 2, 3, 4];
let ordered_iterator = OrderedIterator::new(&vec, None); let ordered_iterator = OrderedIterator::new(&vec, None);
let resp: Vec<(&usize, &usize)> = ordered_iterator let resp: IteratorResponse = ordered_iterator
.zip(vec.iter()) .zip(vec.iter())
.filter(|(actual_elem, expected_elem)| *actual_elem == *expected_elem) .zip(0..=4)
.filter(|(((index, actual_elem), expected_elem), expected_index)| {
*actual_elem == *expected_elem && index == expected_index
})
.collect(); .collect();
assert_eq!(resp.len(), vec.len()); assert_eq!(resp.len(), vec.len());

View File

@ -0,0 +1,7 @@
use crossbeam_channel::{Receiver, Sender};
use solana_sdk::{hash::Hash, pubkey::Pubkey};
use solana_vote_program::vote_state::Vote;
pub type ReplayedVote = (Pubkey, Vote, Option<Hash>);
pub type ReplayVoteSender = Sender<ReplayedVote>;
pub type ReplayVoteReceiver = Receiver<ReplayedVote>;