From 4c4f7905b1c896c18518c89f6e9cd8152a6d2838 Mon Sep 17 00:00:00 2001 From: Tyera Date: Tue, 23 May 2023 19:48:41 -0600 Subject: [PATCH] Send messages to EntryNotifierService from blockstore_processor (#31305) * Rename variable to disabiguate tx indexes from entry indexes * Send entry notifications from blockstore_processor * Escalate log for send failure to WARN --- entry/src/entry.rs | 4 +-- ledger/src/blockstore_processor.rs | 46 +++++++++++++++++++++--------- 2 files changed, 34 insertions(+), 16 deletions(-) diff --git a/entry/src/entry.rs b/entry/src/entry.rs index 8d7304b7d2..0551abe02a 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -152,8 +152,8 @@ pub struct EntrySummary { pub num_transactions: u64, } -impl From for EntrySummary { - fn from(entry: Entry) -> Self { +impl From<&Entry> for EntrySummary { + fn from(entry: &Entry) -> Self { Self { num_hashes: entry.num_hashes, hash: entry.hash, diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index b81212be87..48a726701c 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -1,8 +1,12 @@ use { crate::{ - block_error::BlockError, blockstore::Blockstore, blockstore_db::BlockstoreError, - blockstore_meta::SlotMeta, entry_notifier_service::EntryNotifierSender, - leader_schedule_cache::LeaderScheduleCache, token_balances::collect_token_balances, + block_error::BlockError, + blockstore::Blockstore, + blockstore_db::BlockstoreError, + blockstore_meta::SlotMeta, + entry_notifier_service::{EntryNotification, EntryNotifierSender}, + leader_schedule_cache::LeaderScheduleCache, + token_balances::collect_token_balances, }, chrono_humanize::{Accuracy, HumanTime, Tense}, crossbeam_channel::Sender, @@ -1109,7 +1113,7 @@ fn confirm_slot_entries( progress: &mut ConfirmationProgress, skip_verification: bool, transaction_status_sender: Option<&TransactionStatusSender>, - _entry_notification_sender: Option<&EntryNotifierSender>, + entry_notification_sender: Option<&EntryNotifierSender>, replay_vote_sender: Option<&ReplayVoteSender>, recyclers: &VerifyRecyclers, log_messages_bytes_limit: Option, @@ -1132,15 +1136,29 @@ fn confirm_slot_entries( let slot = bank.slot(); let (entries, num_shreds, slot_full) = slot_entries_load_result; let num_entries = entries.len(); - let mut entry_starting_indexes = Vec::with_capacity(num_entries); - let mut entry_starting_index = progress.num_txs; + let mut entry_tx_starting_indexes = Vec::with_capacity(num_entries); + let mut entry_tx_starting_index = progress.num_txs; let num_txs = entries .iter() - .map(|e| { - let num_txs = e.transactions.len(); - let next_starting_index = entry_starting_index.saturating_add(num_txs); - entry_starting_indexes.push(entry_starting_index); - entry_starting_index = next_starting_index; + .enumerate() + .map(|(i, entry)| { + if let Some(entry_notification_sender) = entry_notification_sender { + let entry_index = progress.num_entries.saturating_add(i); + if let Err(err) = entry_notification_sender.send(EntryNotification { + slot, + index: entry_index, + entry: entry.into(), + }) { + warn!( + "Slot {}, entry {} entry_notification_sender send failed: {:?}", + slot, entry_index, err + ); + } + } + let num_txs = entry.transactions.len(); + let next_tx_starting_index = entry_tx_starting_index.saturating_add(num_txs); + entry_tx_starting_indexes.push(entry_tx_starting_index); + entry_tx_starting_index = next_tx_starting_index; num_txs }) .sum::(); @@ -1222,10 +1240,10 @@ fn confirm_slot_entries( let mut replay_timer = Measure::start("replay_elapsed"); let mut replay_entries: Vec<_> = entries .into_iter() - .zip(entry_starting_indexes) - .map(|(entry, starting_index)| ReplayEntry { + .zip(entry_tx_starting_indexes) + .map(|(entry, tx_starting_index)| ReplayEntry { entry, - starting_index, + starting_index: tx_starting_index, }) .collect(); // Note: This will shuffle entries' transactions in-place.