returns completed-data-set-info from insert_data_shred

instead of opaque (u32, u32) which are then converted to
CompletedDataSetInfo at the call-site.
This commit is contained in:
behzad nouri 2021-08-13 15:05:18 -04:00
parent 3efccbffab
commit 3c71670bd9
1 changed files with 71 additions and 79 deletions

View File

@ -1,68 +1,69 @@
//! The `blockstore` module provides functions for parallel verification of the //! The `blockstore` module provides functions for parallel verification of the
//! Proof of History ledger as well as iterative read, append write, and random //! Proof of History ledger as well as iterative read, append write, and random
//! access read to a persistent file-based ledger. //! access read to a persistent file-based ledger.
use crate::{
ancestor_iterator::AncestorIterator,
blockstore_db::{
columns as cf, AccessType, BlockstoreRecoveryMode, Column, Database, IteratorDirection,
IteratorMode, LedgerColumn, Result, WriteBatch,
},
blockstore_meta::*,
erasure::ErasureConfig,
leader_schedule_cache::LeaderScheduleCache,
next_slots_iterator::NextSlotsIterator,
shred::{Result as ShredResult, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK},
};
pub use crate::{blockstore_db::BlockstoreError, blockstore_meta::SlotMeta}; pub use crate::{blockstore_db::BlockstoreError, blockstore_meta::SlotMeta};
use bincode::deserialize; use {
use log::*; crate::{
use rayon::{ ancestor_iterator::AncestorIterator,
iter::{IntoParallelRefIterator, ParallelIterator}, blockstore_db::{
ThreadPool, columns as cf, AccessType, BlockstoreRecoveryMode, Column, Database, IteratorDirection,
}; IteratorMode, LedgerColumn, Result, WriteBatch,
use rocksdb::DBRawIterator; },
use solana_entry::entry::{create_ticks, Entry}; blockstore_meta::*,
use solana_measure::measure::Measure; erasure::ErasureConfig,
use solana_metrics::{datapoint_debug, datapoint_error}; leader_schedule_cache::LeaderScheduleCache,
use solana_rayon_threadlimit::get_thread_count; next_slots_iterator::NextSlotsIterator,
use solana_runtime::hardened_unpack::{unpack_genesis_archive, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE}; shred::{Result as ShredResult, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK},
use solana_sdk::{
clock::{Slot, UnixTimestamp, DEFAULT_TICKS_PER_SECOND, MS_PER_TICK},
genesis_config::{GenesisConfig, DEFAULT_GENESIS_ARCHIVE, DEFAULT_GENESIS_FILE},
hash::Hash,
pubkey::Pubkey,
sanitize::Sanitize,
signature::{Keypair, Signature, Signer},
timing::timestamp,
transaction::Transaction,
};
use solana_storage_proto::{StoredExtendedRewards, StoredTransactionStatusMeta};
use solana_transaction_status::{
ConfirmedBlock, ConfirmedTransaction, ConfirmedTransactionStatusWithSignature, Rewards,
TransactionStatusMeta, TransactionWithStatusMeta,
};
use std::{
borrow::Cow,
cell::RefCell,
cmp,
collections::{BTreeMap, HashMap, HashSet},
convert::TryInto,
fs,
io::{Error as IoError, ErrorKind},
path::{Path, PathBuf},
rc::Rc,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{sync_channel, Receiver, SyncSender, TrySendError},
Arc, Mutex, RwLock, RwLockWriteGuard,
}, },
time::Instant, bincode::deserialize,
log::*,
rayon::{
iter::{IntoParallelRefIterator, ParallelIterator},
ThreadPool,
},
rocksdb::DBRawIterator,
solana_entry::entry::{create_ticks, Entry},
solana_measure::measure::Measure,
solana_metrics::{datapoint_debug, datapoint_error},
solana_rayon_threadlimit::get_thread_count,
solana_runtime::hardened_unpack::{unpack_genesis_archive, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
solana_sdk::{
clock::{Slot, UnixTimestamp, DEFAULT_TICKS_PER_SECOND, MS_PER_TICK},
genesis_config::{GenesisConfig, DEFAULT_GENESIS_ARCHIVE, DEFAULT_GENESIS_FILE},
hash::Hash,
pubkey::Pubkey,
sanitize::Sanitize,
signature::{Keypair, Signature, Signer},
timing::timestamp,
transaction::Transaction,
},
solana_storage_proto::{StoredExtendedRewards, StoredTransactionStatusMeta},
solana_transaction_status::{
ConfirmedBlock, ConfirmedTransaction, ConfirmedTransactionStatusWithSignature, Rewards,
TransactionStatusMeta, TransactionWithStatusMeta,
},
std::{
borrow::Cow,
cell::RefCell,
cmp,
collections::{BTreeMap, HashMap, HashSet},
convert::TryInto,
fs,
io::{Error as IoError, ErrorKind},
path::{Path, PathBuf},
rc::Rc,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{sync_channel, Receiver, SyncSender, TrySendError},
Arc, Mutex, RwLock, RwLockWriteGuard,
},
time::Instant,
},
tempfile::TempDir,
thiserror::Error,
trees::{Tree, TreeWalk},
}; };
use tempfile::TempDir;
use thiserror::Error;
use trees::{Tree, TreeWalk};
pub mod blockstore_purge; pub mod blockstore_purge;
pub const BLOCKSTORE_DIRECTORY: &str = "rocksdb"; pub const BLOCKSTORE_DIRECTORY: &str = "rocksdb";
@ -833,7 +834,6 @@ impl Blockstore {
.enumerate() .enumerate()
.for_each(|(i, (shred, is_repaired))| { .for_each(|(i, (shred, is_repaired))| {
if shred.is_data() { if shred.is_data() {
let shred_slot = shred.slot();
let shred_source = if is_repaired { let shred_source = if is_repaired {
ShredSource::Repaired ShredSource::Repaired
} else { } else {
@ -852,13 +852,7 @@ impl Blockstore {
leader_schedule, leader_schedule,
shred_source, shred_source,
) { ) {
newly_completed_data_sets.extend(completed_data_sets.into_iter().map( newly_completed_data_sets.extend(completed_data_sets);
|(start_index, end_index)| CompletedDataSetInfo {
slot: shred_slot,
start_index,
end_index,
},
));
inserted_indices.push(i); inserted_indices.push(i);
num_inserted += 1; num_inserted += 1;
} }
@ -898,7 +892,6 @@ impl Blockstore {
num_recovered = recovered_data.len(); num_recovered = recovered_data.len();
recovered_data.into_iter().for_each(|shred| { recovered_data.into_iter().for_each(|shred| {
if let Some(leader) = leader_schedule_cache.slot_leader_at(shred.slot(), None) { if let Some(leader) = leader_schedule_cache.slot_leader_at(shred.slot(), None) {
let shred_slot = shred.slot();
if shred.verify(&leader) { if shred.verify(&leader) {
match self.check_insert_data_shred( match self.check_insert_data_shred(
shred, shred,
@ -921,15 +914,7 @@ impl Blockstore {
} }
Err(InsertDataShredError::BlockstoreError(_)) => {} Err(InsertDataShredError::BlockstoreError(_)) => {}
Ok(completed_data_sets) => { Ok(completed_data_sets) => {
newly_completed_data_sets.extend( newly_completed_data_sets.extend(completed_data_sets);
completed_data_sets.into_iter().map(
|(start_index, end_index)| CompletedDataSetInfo {
slot: shred_slot,
start_index,
end_index,
},
),
);
num_recovered_inserted += 1; num_recovered_inserted += 1;
} }
} }
@ -1225,7 +1210,7 @@ impl Blockstore {
handle_duplicate: &F, handle_duplicate: &F,
leader_schedule: Option<&LeaderScheduleCache>, leader_schedule: Option<&LeaderScheduleCache>,
shred_source: ShredSource, shred_source: ShredSource,
) -> std::result::Result<Vec<(u32, u32)>, InsertDataShredError> ) -> std::result::Result<Vec<CompletedDataSetInfo>, InsertDataShredError>
where where
F: Fn(Shred), F: Fn(Shred),
{ {
@ -1491,7 +1476,7 @@ impl Blockstore {
shred: &Shred, shred: &Shred,
write_batch: &mut WriteBatch, write_batch: &mut WriteBatch,
shred_source: ShredSource, shred_source: ShredSource,
) -> Result<Vec<(u32, u32)>> { ) -> Result<Vec<CompletedDataSetInfo>> {
let slot = shred.slot(); let slot = shred.slot();
let index = u64::from(shred.index()); let index = u64::from(shred.index());
@ -1540,7 +1525,14 @@ impl Blockstore {
new_consumed, new_consumed,
shred.reference_tick(), shred.reference_tick(),
data_index, data_index,
); )
.into_iter()
.map(|(start_index, end_index)| CompletedDataSetInfo {
slot,
start_index,
end_index,
})
.collect();
if shred_source == ShredSource::Repaired || shred_source == ShredSource::Recovered { if shred_source == ShredSource::Repaired || shred_source == ShredSource::Recovered {
let mut slots_stats = self.slots_stats.lock().unwrap(); let mut slots_stats = self.slots_stats.lock().unwrap();
let mut e = slots_stats.stats.entry(slot_meta.slot).or_default(); let mut e = slots_stats.stats.entry(slot_meta.slot).or_default();