Send messages to EntryNotifierService from blockstore_processor (#31305)

* Rename variable to disabiguate tx indexes from entry indexes

* Send entry notifications from blockstore_processor

* Escalate log for send failure to WARN
This commit is contained in:
Tyera 2023-05-23 19:48:41 -06:00 committed by GitHub
parent fa4c6aa015
commit 4c4f7905b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 16 deletions

View File

@ -152,8 +152,8 @@ pub struct EntrySummary {
pub num_transactions: u64,
}
impl From<Entry> for EntrySummary {
fn from(entry: Entry) -> Self {
impl From<&Entry> for EntrySummary {
fn from(entry: &Entry) -> Self {
Self {
num_hashes: entry.num_hashes,
hash: entry.hash,

View File

@ -1,8 +1,12 @@
use {
crate::{
block_error::BlockError, blockstore::Blockstore, blockstore_db::BlockstoreError,
blockstore_meta::SlotMeta, entry_notifier_service::EntryNotifierSender,
leader_schedule_cache::LeaderScheduleCache, token_balances::collect_token_balances,
block_error::BlockError,
blockstore::Blockstore,
blockstore_db::BlockstoreError,
blockstore_meta::SlotMeta,
entry_notifier_service::{EntryNotification, EntryNotifierSender},
leader_schedule_cache::LeaderScheduleCache,
token_balances::collect_token_balances,
},
chrono_humanize::{Accuracy, HumanTime, Tense},
crossbeam_channel::Sender,
@ -1109,7 +1113,7 @@ fn confirm_slot_entries(
progress: &mut ConfirmationProgress,
skip_verification: bool,
transaction_status_sender: Option<&TransactionStatusSender>,
_entry_notification_sender: Option<&EntryNotifierSender>,
entry_notification_sender: Option<&EntryNotifierSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
recyclers: &VerifyRecyclers,
log_messages_bytes_limit: Option<usize>,
@ -1132,15 +1136,29 @@ fn confirm_slot_entries(
let slot = bank.slot();
let (entries, num_shreds, slot_full) = slot_entries_load_result;
let num_entries = entries.len();
let mut entry_starting_indexes = Vec::with_capacity(num_entries);
let mut entry_starting_index = progress.num_txs;
let mut entry_tx_starting_indexes = Vec::with_capacity(num_entries);
let mut entry_tx_starting_index = progress.num_txs;
let num_txs = entries
.iter()
.map(|e| {
let num_txs = e.transactions.len();
let next_starting_index = entry_starting_index.saturating_add(num_txs);
entry_starting_indexes.push(entry_starting_index);
entry_starting_index = next_starting_index;
.enumerate()
.map(|(i, entry)| {
if let Some(entry_notification_sender) = entry_notification_sender {
let entry_index = progress.num_entries.saturating_add(i);
if let Err(err) = entry_notification_sender.send(EntryNotification {
slot,
index: entry_index,
entry: entry.into(),
}) {
warn!(
"Slot {}, entry {} entry_notification_sender send failed: {:?}",
slot, entry_index, err
);
}
}
let num_txs = entry.transactions.len();
let next_tx_starting_index = entry_tx_starting_index.saturating_add(num_txs);
entry_tx_starting_indexes.push(entry_tx_starting_index);
entry_tx_starting_index = next_tx_starting_index;
num_txs
})
.sum::<usize>();
@ -1222,10 +1240,10 @@ fn confirm_slot_entries(
let mut replay_timer = Measure::start("replay_elapsed");
let mut replay_entries: Vec<_> = entries
.into_iter()
.zip(entry_starting_indexes)
.map(|(entry, starting_index)| ReplayEntry {
.zip(entry_tx_starting_indexes)
.map(|(entry, tx_starting_index)| ReplayEntry {
entry,
starting_index,
starting_index: tx_starting_index,
})
.collect();
// Note: This will shuffle entries' transactions in-place.