From 3c71670bd9e3d3d8ef1ae3a05c71dc8b09b4f5cd Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Fri, 13 Aug 2021 15:05:18 -0400 Subject: [PATCH] returns completed-data-set-info from insert_data_shred instead of opaque (u32, u32) which are then converted to CompletedDataSetInfo at the call-site. --- ledger/src/blockstore.rs | 150 ++++++++++++++++++--------------------- 1 file changed, 71 insertions(+), 79 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index f1eb114c37..e313a1f4f7 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1,68 +1,69 @@ //! The `blockstore` module provides functions for parallel verification of the //! Proof of History ledger as well as iterative read, append write, and random //! access read to a persistent file-based ledger. -use crate::{ - ancestor_iterator::AncestorIterator, - blockstore_db::{ - columns as cf, AccessType, BlockstoreRecoveryMode, Column, Database, IteratorDirection, - IteratorMode, LedgerColumn, Result, WriteBatch, - }, - blockstore_meta::*, - erasure::ErasureConfig, - leader_schedule_cache::LeaderScheduleCache, - next_slots_iterator::NextSlotsIterator, - shred::{Result as ShredResult, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK}, -}; pub use crate::{blockstore_db::BlockstoreError, blockstore_meta::SlotMeta}; -use bincode::deserialize; -use log::*; -use rayon::{ - iter::{IntoParallelRefIterator, ParallelIterator}, - ThreadPool, -}; -use rocksdb::DBRawIterator; -use solana_entry::entry::{create_ticks, Entry}; -use solana_measure::measure::Measure; -use solana_metrics::{datapoint_debug, datapoint_error}; -use solana_rayon_threadlimit::get_thread_count; -use solana_runtime::hardened_unpack::{unpack_genesis_archive, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE}; -use solana_sdk::{ - clock::{Slot, UnixTimestamp, DEFAULT_TICKS_PER_SECOND, MS_PER_TICK}, - genesis_config::{GenesisConfig, DEFAULT_GENESIS_ARCHIVE, DEFAULT_GENESIS_FILE}, - hash::Hash, - pubkey::Pubkey, - sanitize::Sanitize, - signature::{Keypair, Signature, Signer}, - timing::timestamp, - transaction::Transaction, -}; -use solana_storage_proto::{StoredExtendedRewards, StoredTransactionStatusMeta}; -use solana_transaction_status::{ - ConfirmedBlock, ConfirmedTransaction, ConfirmedTransactionStatusWithSignature, Rewards, - TransactionStatusMeta, TransactionWithStatusMeta, -}; -use std::{ - borrow::Cow, - cell::RefCell, - cmp, - collections::{BTreeMap, HashMap, HashSet}, - convert::TryInto, - fs, - io::{Error as IoError, ErrorKind}, - path::{Path, PathBuf}, - rc::Rc, - sync::{ - atomic::{AtomicBool, Ordering}, - mpsc::{sync_channel, Receiver, SyncSender, TrySendError}, - Arc, Mutex, RwLock, RwLockWriteGuard, +use { + crate::{ + ancestor_iterator::AncestorIterator, + blockstore_db::{ + columns as cf, AccessType, BlockstoreRecoveryMode, Column, Database, IteratorDirection, + IteratorMode, LedgerColumn, Result, WriteBatch, + }, + blockstore_meta::*, + erasure::ErasureConfig, + leader_schedule_cache::LeaderScheduleCache, + next_slots_iterator::NextSlotsIterator, + shred::{Result as ShredResult, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK}, }, - time::Instant, + bincode::deserialize, + log::*, + rayon::{ + iter::{IntoParallelRefIterator, ParallelIterator}, + ThreadPool, + }, + rocksdb::DBRawIterator, + solana_entry::entry::{create_ticks, Entry}, + solana_measure::measure::Measure, + solana_metrics::{datapoint_debug, datapoint_error}, + solana_rayon_threadlimit::get_thread_count, + solana_runtime::hardened_unpack::{unpack_genesis_archive, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE}, + solana_sdk::{ + clock::{Slot, UnixTimestamp, DEFAULT_TICKS_PER_SECOND, MS_PER_TICK}, + genesis_config::{GenesisConfig, DEFAULT_GENESIS_ARCHIVE, DEFAULT_GENESIS_FILE}, + hash::Hash, + pubkey::Pubkey, + sanitize::Sanitize, + signature::{Keypair, Signature, Signer}, + timing::timestamp, + transaction::Transaction, + }, + solana_storage_proto::{StoredExtendedRewards, StoredTransactionStatusMeta}, + solana_transaction_status::{ + ConfirmedBlock, ConfirmedTransaction, ConfirmedTransactionStatusWithSignature, Rewards, + TransactionStatusMeta, TransactionWithStatusMeta, + }, + std::{ + borrow::Cow, + cell::RefCell, + cmp, + collections::{BTreeMap, HashMap, HashSet}, + convert::TryInto, + fs, + io::{Error as IoError, ErrorKind}, + path::{Path, PathBuf}, + rc::Rc, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::{sync_channel, Receiver, SyncSender, TrySendError}, + Arc, Mutex, RwLock, RwLockWriteGuard, + }, + time::Instant, + }, + tempfile::TempDir, + thiserror::Error, + trees::{Tree, TreeWalk}, }; -use tempfile::TempDir; -use thiserror::Error; -use trees::{Tree, TreeWalk}; - pub mod blockstore_purge; pub const BLOCKSTORE_DIRECTORY: &str = "rocksdb"; @@ -833,7 +834,6 @@ impl Blockstore { .enumerate() .for_each(|(i, (shred, is_repaired))| { if shred.is_data() { - let shred_slot = shred.slot(); let shred_source = if is_repaired { ShredSource::Repaired } else { @@ -852,13 +852,7 @@ impl Blockstore { leader_schedule, shred_source, ) { - newly_completed_data_sets.extend(completed_data_sets.into_iter().map( - |(start_index, end_index)| CompletedDataSetInfo { - slot: shred_slot, - start_index, - end_index, - }, - )); + newly_completed_data_sets.extend(completed_data_sets); inserted_indices.push(i); num_inserted += 1; } @@ -898,7 +892,6 @@ impl Blockstore { num_recovered = recovered_data.len(); recovered_data.into_iter().for_each(|shred| { if let Some(leader) = leader_schedule_cache.slot_leader_at(shred.slot(), None) { - let shred_slot = shred.slot(); if shred.verify(&leader) { match self.check_insert_data_shred( shred, @@ -921,15 +914,7 @@ impl Blockstore { } Err(InsertDataShredError::BlockstoreError(_)) => {} Ok(completed_data_sets) => { - newly_completed_data_sets.extend( - completed_data_sets.into_iter().map( - |(start_index, end_index)| CompletedDataSetInfo { - slot: shred_slot, - start_index, - end_index, - }, - ), - ); + newly_completed_data_sets.extend(completed_data_sets); num_recovered_inserted += 1; } } @@ -1225,7 +1210,7 @@ impl Blockstore { handle_duplicate: &F, leader_schedule: Option<&LeaderScheduleCache>, shred_source: ShredSource, - ) -> std::result::Result, InsertDataShredError> + ) -> std::result::Result, InsertDataShredError> where F: Fn(Shred), { @@ -1491,7 +1476,7 @@ impl Blockstore { shred: &Shred, write_batch: &mut WriteBatch, shred_source: ShredSource, - ) -> Result> { + ) -> Result> { let slot = shred.slot(); let index = u64::from(shred.index()); @@ -1540,7 +1525,14 @@ impl Blockstore { new_consumed, shred.reference_tick(), data_index, - ); + ) + .into_iter() + .map(|(start_index, end_index)| CompletedDataSetInfo { + slot, + start_index, + end_index, + }) + .collect(); if shred_source == ShredSource::Repaired || shred_source == ShredSource::Recovered { let mut slots_stats = self.slots_stats.lock().unwrap(); let mut e = slots_stats.stats.entry(slot_meta.slot).or_default();