diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index cbd43162f..66318a7ee 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -3,7 +3,6 @@ //! access read to a persistent file-based ledger. use crate::entry::Entry; use crate::erasure::ErasureConfig; -use crate::packet::Blob; use crate::result::{Error, Result}; use crate::shred::{Shred, Shredder}; @@ -72,8 +71,8 @@ pub type CompletedSlotsReceiver = Receiver>; #[derive(Debug)] pub enum BlocktreeError { - BlobForIndexExists, - InvalidBlobData(Box), + ShredForIndexExists, + InvalidShredData(Box), RocksDb(rocksdb::Error), #[cfg(feature = "kvstore")] KvsDb(kvstore::Error), @@ -84,9 +83,7 @@ pub enum BlocktreeError { pub struct Blocktree { db: Arc, meta_cf: LedgerColumn, - data_cf: LedgerColumn, dead_slots_cf: LedgerColumn, - erasure_cf: LedgerColumn, erasure_meta_cf: LedgerColumn, orphans_cf: LedgerColumn, index_cf: LedgerColumn, @@ -100,12 +97,8 @@ pub struct Blocktree { // Column family for metadata about a leader slot pub const META_CF: &str = "meta"; -// Column family for the data in a leader slot -pub const DATA_CF: &str = "data"; // Column family for slots that have been marked as dead pub const DEAD_SLOTS_CF: &str = "dead_slots"; -// Column family for erasure data -pub const ERASURE_CF: &str = "erasure"; pub const ERASURE_META_CF: &str = "erasure_meta"; // Column family for orphans data pub const ORPHANS_CF: &str = "orphans"; @@ -132,15 +125,9 @@ impl Blocktree { // Create the metadata column family let meta_cf = db.column(); - // Create the data column family - let data_cf = db.column(); - // Create the dead slots column family let dead_slots_cf = db.column(); - // Create the erasure column family - let erasure_cf = db.column(); - let erasure_meta_cf = db.column(); // Create the orphans column family. An "orphan" is defined as @@ -165,9 +152,7 @@ impl Blocktree { Ok(Blocktree { db, meta_cf, - data_cf, dead_slots_cf, - erasure_cf, erasure_meta_cf, orphans_cf, index_cf, @@ -253,18 +238,10 @@ impl Blocktree { .meta_cf .delete_slot(&mut write_batch, from_slot, batch_end) .unwrap_or(false) - && self - .data_cf - .delete_slot(&mut write_batch, from_slot, batch_end) - .unwrap_or(false) && self .erasure_meta_cf .delete_slot(&mut write_batch, from_slot, batch_end) .unwrap_or(false) - && self - .erasure_cf - .delete_slot(&mut write_batch, from_slot, batch_end) - .unwrap_or(false) && self .data_shred_cf .delete_slot(&mut write_batch, from_slot, batch_end) @@ -804,11 +781,6 @@ impl Blocktree { self.meta_cf.put_bytes(slot, bytes) } - pub fn get_data_shred_as_blob(&self, slot: u64, shred_index: u64) -> Result> { - let bytes = self.get_data_shred(slot, shred_index)?; - Ok(bytes.map(|bytes| Blob::new(&bytes))) - } - // Given a start and end entry index, find all the missing // indexes in the ledger in the range [start_index, end_index) // for the slot with the specified slot @@ -1773,10 +1745,13 @@ pub mod tests { // Test erasure column family let erasure = vec![1u8; 16]; let erasure_key = (0, 0); - ledger.erasure_cf.put_bytes(erasure_key, &erasure).unwrap(); + ledger + .code_shred_cf + .put_bytes(erasure_key, &erasure) + .unwrap(); let result = ledger - .erasure_cf + .code_shred_cf .get_bytes(erasure_key) .unwrap() .expect("Expected erasure object to exist"); @@ -1786,10 +1761,10 @@ pub mod tests { // Test data column family let data = vec![2u8; 16]; let data_key = (0, 0); - ledger.data_cf.put_bytes(data_key, &data).unwrap(); + ledger.data_shred_cf.put_bytes(data_key, &data).unwrap(); let result = ledger - .data_cf + .data_shred_cf .get_bytes(data_key) .unwrap() .expect("Expected data object to exist"); diff --git a/core/src/blocktree/db.rs b/core/src/blocktree/db.rs index 2ca67f03b..5c3df658d 100644 --- a/core/src/blocktree/db.rs +++ b/core/src/blocktree/db.rs @@ -32,14 +32,6 @@ pub mod columns { /// Orphans Column pub struct Orphans; - #[derive(Debug)] - /// Erasure Column - pub struct Coding; - - #[derive(Debug)] - /// Data Column - pub struct Data; - #[derive(Debug)] /// Data Column pub struct DeadSlots; diff --git a/core/src/blocktree/rocks.rs b/core/src/blocktree/rocks.rs index 3ee4ea7c3..51152223d 100644 --- a/core/src/blocktree/rocks.rs +++ b/core/src/blocktree/rocks.rs @@ -1,5 +1,7 @@ use crate::blocktree::db::columns as cf; -use crate::blocktree::db::{Backend, Column, DbCursor, IWriteBatch, TypedColumn, IteratorMode, IteratorDirection}; +use crate::blocktree::db::{ + Backend, Column, DbCursor, IWriteBatch, IteratorDirection, IteratorMode, TypedColumn, +}; use crate::blocktree::BlocktreeError; use crate::result::{Error, Result}; use solana_sdk::timing::Slot; @@ -7,8 +9,8 @@ use solana_sdk::timing::Slot; use byteorder::{BigEndian, ByteOrder}; use rocksdb::{ - self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, Direction, IteratorMode as RocksIteratorMode, - Options, WriteBatch as RWriteBatch, DB, + self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, Direction, + IteratorMode as RocksIteratorMode, Options, WriteBatch as RWriteBatch, DB, }; use std::fs; @@ -33,8 +35,7 @@ impl Backend for Rocks { fn open(path: &Path) -> Result { use crate::blocktree::db::columns::{ - Coding, Data, DeadSlots, ErasureMeta, Index, Orphans, Root, ShredCode, ShredData, - SlotMeta, + DeadSlots, ErasureMeta, Index, Orphans, Root, ShredCode, ShredData, SlotMeta, }; fs::create_dir_all(&path)?; @@ -45,12 +46,8 @@ impl Backend for Rocks { // Column family names let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options(SlotMeta::NAME)); - let data_cf_descriptor = - ColumnFamilyDescriptor::new(Data::NAME, get_cf_options(Data::NAME)); let dead_slots_cf_descriptor = ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options(DeadSlots::NAME)); - let erasure_cf_descriptor = - ColumnFamilyDescriptor::new(Coding::NAME, get_cf_options(Coding::NAME)); let erasure_meta_cf_descriptor = ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options(ErasureMeta::NAME)); let orphans_cf_descriptor = @@ -66,9 +63,7 @@ impl Backend for Rocks { let cfs = vec![ meta_cf_descriptor, - data_cf_descriptor, dead_slots_cf_descriptor, - erasure_cf_descriptor, erasure_meta_cf_descriptor, orphans_cf_descriptor, root_cf_descriptor, @@ -85,15 +80,12 @@ impl Backend for Rocks { fn columns(&self) -> Vec<&'static str> { use crate::blocktree::db::columns::{ - Coding, Data, DeadSlots, ErasureMeta, Index, Orphans, Root, ShredCode, ShredData, - SlotMeta, + DeadSlots, ErasureMeta, Index, Orphans, Root, ShredCode, ShredData, SlotMeta, }; vec![ - Coding::NAME, ErasureMeta::NAME, DeadSlots::NAME, - Data::NAME, Index::NAME, Orphans::NAME, Root::NAME, @@ -130,28 +122,24 @@ impl Backend for Rocks { Ok(()) } - fn iterator_cf(&self, cf: ColumnFamily, iterator_mode: IteratorMode<&[u8]>,) -> Result { + fn iterator_cf( + &self, + cf: ColumnFamily, + iterator_mode: IteratorMode<&[u8]>, + ) -> Result { let iter = { - match iterator_mode { - IteratorMode::Start => { - self.0.iterator_cf(cf, RocksIteratorMode::Start)? - } - IteratorMode::End => { - self.0.iterator_cf(cf, RocksIteratorMode::End)? - } - IteratorMode::From(start_from, direction) => { - let rocks_direction = match direction { - IteratorDirection::Forward => { - Direction::Forward - } - IteratorDirection::Reverse => { - Direction::Reverse - } - }; - self.0 - .iterator_cf(cf, RocksIteratorMode::From(start_from, rocks_direction))? - } + match iterator_mode { + IteratorMode::Start => self.0.iterator_cf(cf, RocksIteratorMode::Start)?, + IteratorMode::End => self.0.iterator_cf(cf, RocksIteratorMode::End)?, + IteratorMode::From(start_from, direction) => { + let rocks_direction = match direction { + IteratorDirection::Forward => Direction::Forward, + IteratorDirection::Reverse => Direction::Reverse, + }; + self.0 + .iterator_cf(cf, RocksIteratorMode::From(start_from, rocks_direction))? } + } }; Ok(iter) @@ -173,53 +161,6 @@ impl Backend for Rocks { } } -impl Column for cf::Coding { - const NAME: &'static str = super::ERASURE_CF; - type Index = (u64, u64); - - fn key(index: (u64, u64)) -> Vec { - cf::Data::key(index) - } - - fn index(key: &[u8]) -> (u64, u64) { - cf::Data::index(key) - } - - fn slot(index: Self::Index) -> Slot { - index.0 - } - - fn as_index(slot: Slot) -> Self::Index { - (slot, 0) - } -} - -impl Column for cf::Data { - const NAME: &'static str = super::DATA_CF; - type Index = (u64, u64); - - fn key((slot, index): (u64, u64)) -> Vec { - let mut key = vec![0; 16]; - BigEndian::write_u64(&mut key[..8], slot); - BigEndian::write_u64(&mut key[8..16], index); - key - } - - fn index(key: &[u8]) -> (u64, u64) { - let slot = BigEndian::read_u64(&key[..8]); - let index = BigEndian::read_u64(&key[8..16]); - (slot, index) - } - - fn slot(index: Self::Index) -> Slot { - index.0 - } - - fn as_index(slot: Slot) -> Self::Index { - (slot, 0) - } -} - impl Column for cf::ShredCode { const NAME: &'static str = super::CODE_SHRED_CF; type Index = (u64, u64); @@ -478,11 +419,11 @@ impl std::convert::From for Error { } fn get_cf_options(name: &'static str) -> Options { - use crate::blocktree::db::columns::{Coding, Data, ShredCode, ShredData}; + use crate::blocktree::db::columns::{ShredCode, ShredData}; let mut options = Options::default(); match name { - Coding::NAME | Data::NAME | ShredCode::NAME | ShredData::NAME => { + ShredCode::NAME | ShredData::NAME => { // 512MB * 8 = 4GB. 2 of these columns should take no more than 8GB of RAM options.set_max_write_buffer_number(8); options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE as usize); diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index ba8cd28c0..c91925bbe 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -19,7 +19,7 @@ use crate::crds_gossip::CrdsGossip; use crate::crds_gossip_error::CrdsGossipError; use crate::crds_gossip_pull::{CrdsFilter, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}; use crate::crds_value::{CrdsValue, CrdsValueLabel, EpochSlots, Vote}; -use crate::packet::{to_shared_blob, Packet, SharedBlob}; +use crate::packet::{to_shared_blob, Blob, Packet, SharedBlob}; use crate::repair_service::RepairType; use crate::result::Result; use crate::staking_utils; @@ -1035,6 +1035,15 @@ impl ClusterInfo { .unwrap() } + fn get_data_shred_as_blob( + blocktree: &Arc, + slot: u64, + shred_index: u64, + ) -> Result> { + let bytes = blocktree.get_data_shred(slot, shred_index)?; + Ok(bytes.map(|bytes| Blob::new(&bytes))) + } + fn run_window_request( from: &ContactInfo, from_addr: &SocketAddr, @@ -1045,7 +1054,7 @@ impl ClusterInfo { ) -> Vec { if let Some(blocktree) = blocktree { // Try to find the requested index in one of the slots - let blob = blocktree.get_data_shred_as_blob(slot, blob_index); + let blob = Self::get_data_shred_as_blob(blocktree, slot, blob_index); if let Ok(Some(mut blob)) = blob { inc_new_counter_debug!("cluster_info-window-request-ledger", 1); @@ -1080,7 +1089,7 @@ impl ClusterInfo { if let Ok(Some(meta)) = meta { if meta.received > highest_index { // meta.received must be at least 1 by this point - let blob = blocktree.get_data_shred_as_blob(slot, meta.received - 1); + let blob = Self::get_data_shred_as_blob(blocktree, slot, meta.received - 1); if let Ok(Some(mut blob)) = blob { blob.meta.set_addr(from_addr); @@ -1106,7 +1115,7 @@ impl ClusterInfo { if meta.received == 0 { break; } - let blob = blocktree.get_data_shred_as_blob(slot, meta.received - 1); + let blob = Self::get_data_shred_as_blob(blocktree, slot, meta.received - 1); if let Ok(Some(mut blob)) = blob { blob.meta.set_addr(from_addr); res.push(Arc::new(RwLock::new(blob))); @@ -2018,8 +2027,7 @@ mod tests { .rev() .map(|slot| { let index = blocktree.meta(slot).unwrap().unwrap().received - 1; - blocktree - .get_data_shred_as_blob(slot, index) + ClusterInfo::get_data_shred_as_blob(&blocktree, slot, index) .unwrap() .unwrap() }) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 05215424c..4957b92dc 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -346,7 +346,7 @@ impl ReplayStage { !Bank::can_commit(&tx_error) } Err(Error::BlobError(BlobError::VerificationFailed)) => true, - Err(Error::BlocktreeError(BlocktreeError::InvalidBlobData(_))) => true, + Err(Error::BlocktreeError(BlocktreeError::InvalidShredData(_))) => true, _ => false, } }