Further cleanup of blocktree after Blob deprecation (#5780)
This commit is contained in:
parent
b19d9a50d3
commit
02ee2a601c
|
@ -3,7 +3,6 @@
|
|||
//! access read to a persistent file-based ledger.
|
||||
use crate::entry::Entry;
|
||||
use crate::erasure::ErasureConfig;
|
||||
use crate::packet::Blob;
|
||||
use crate::result::{Error, Result};
|
||||
use crate::shred::{Shred, Shredder};
|
||||
|
||||
|
@ -72,8 +71,8 @@ pub type CompletedSlotsReceiver = Receiver<Vec<u64>>;
|
|||
|
||||
#[derive(Debug)]
|
||||
pub enum BlocktreeError {
|
||||
BlobForIndexExists,
|
||||
InvalidBlobData(Box<bincode::ErrorKind>),
|
||||
ShredForIndexExists,
|
||||
InvalidShredData(Box<bincode::ErrorKind>),
|
||||
RocksDb(rocksdb::Error),
|
||||
#[cfg(feature = "kvstore")]
|
||||
KvsDb(kvstore::Error),
|
||||
|
@ -84,9 +83,7 @@ pub enum BlocktreeError {
|
|||
pub struct Blocktree {
|
||||
db: Arc<Database>,
|
||||
meta_cf: LedgerColumn<cf::SlotMeta>,
|
||||
data_cf: LedgerColumn<cf::Data>,
|
||||
dead_slots_cf: LedgerColumn<cf::DeadSlots>,
|
||||
erasure_cf: LedgerColumn<cf::Coding>,
|
||||
erasure_meta_cf: LedgerColumn<cf::ErasureMeta>,
|
||||
orphans_cf: LedgerColumn<cf::Orphans>,
|
||||
index_cf: LedgerColumn<cf::Index>,
|
||||
|
@ -100,12 +97,8 @@ pub struct Blocktree {
|
|||
|
||||
// Column family for metadata about a leader slot
|
||||
pub const META_CF: &str = "meta";
|
||||
// Column family for the data in a leader slot
|
||||
pub const DATA_CF: &str = "data";
|
||||
// Column family for slots that have been marked as dead
|
||||
pub const DEAD_SLOTS_CF: &str = "dead_slots";
|
||||
// Column family for erasure data
|
||||
pub const ERASURE_CF: &str = "erasure";
|
||||
pub const ERASURE_META_CF: &str = "erasure_meta";
|
||||
// Column family for orphans data
|
||||
pub const ORPHANS_CF: &str = "orphans";
|
||||
|
@ -132,15 +125,9 @@ impl Blocktree {
|
|||
// Create the metadata column family
|
||||
let meta_cf = db.column();
|
||||
|
||||
// Create the data column family
|
||||
let data_cf = db.column();
|
||||
|
||||
// Create the dead slots column family
|
||||
let dead_slots_cf = db.column();
|
||||
|
||||
// Create the erasure column family
|
||||
let erasure_cf = db.column();
|
||||
|
||||
let erasure_meta_cf = db.column();
|
||||
|
||||
// Create the orphans column family. An "orphan" is defined as
|
||||
|
@ -165,9 +152,7 @@ impl Blocktree {
|
|||
Ok(Blocktree {
|
||||
db,
|
||||
meta_cf,
|
||||
data_cf,
|
||||
dead_slots_cf,
|
||||
erasure_cf,
|
||||
erasure_meta_cf,
|
||||
orphans_cf,
|
||||
index_cf,
|
||||
|
@ -253,18 +238,10 @@ impl Blocktree {
|
|||
.meta_cf
|
||||
.delete_slot(&mut write_batch, from_slot, batch_end)
|
||||
.unwrap_or(false)
|
||||
&& self
|
||||
.data_cf
|
||||
.delete_slot(&mut write_batch, from_slot, batch_end)
|
||||
.unwrap_or(false)
|
||||
&& self
|
||||
.erasure_meta_cf
|
||||
.delete_slot(&mut write_batch, from_slot, batch_end)
|
||||
.unwrap_or(false)
|
||||
&& self
|
||||
.erasure_cf
|
||||
.delete_slot(&mut write_batch, from_slot, batch_end)
|
||||
.unwrap_or(false)
|
||||
&& self
|
||||
.data_shred_cf
|
||||
.delete_slot(&mut write_batch, from_slot, batch_end)
|
||||
|
@ -804,11 +781,6 @@ impl Blocktree {
|
|||
self.meta_cf.put_bytes(slot, bytes)
|
||||
}
|
||||
|
||||
pub fn get_data_shred_as_blob(&self, slot: u64, shred_index: u64) -> Result<Option<Blob>> {
|
||||
let bytes = self.get_data_shred(slot, shred_index)?;
|
||||
Ok(bytes.map(|bytes| Blob::new(&bytes)))
|
||||
}
|
||||
|
||||
// Given a start and end entry index, find all the missing
|
||||
// indexes in the ledger in the range [start_index, end_index)
|
||||
// for the slot with the specified slot
|
||||
|
@ -1773,10 +1745,13 @@ pub mod tests {
|
|||
// Test erasure column family
|
||||
let erasure = vec![1u8; 16];
|
||||
let erasure_key = (0, 0);
|
||||
ledger.erasure_cf.put_bytes(erasure_key, &erasure).unwrap();
|
||||
ledger
|
||||
.code_shred_cf
|
||||
.put_bytes(erasure_key, &erasure)
|
||||
.unwrap();
|
||||
|
||||
let result = ledger
|
||||
.erasure_cf
|
||||
.code_shred_cf
|
||||
.get_bytes(erasure_key)
|
||||
.unwrap()
|
||||
.expect("Expected erasure object to exist");
|
||||
|
@ -1786,10 +1761,10 @@ pub mod tests {
|
|||
// Test data column family
|
||||
let data = vec![2u8; 16];
|
||||
let data_key = (0, 0);
|
||||
ledger.data_cf.put_bytes(data_key, &data).unwrap();
|
||||
ledger.data_shred_cf.put_bytes(data_key, &data).unwrap();
|
||||
|
||||
let result = ledger
|
||||
.data_cf
|
||||
.data_shred_cf
|
||||
.get_bytes(data_key)
|
||||
.unwrap()
|
||||
.expect("Expected data object to exist");
|
||||
|
|
|
@ -32,14 +32,6 @@ pub mod columns {
|
|||
/// Orphans Column
|
||||
pub struct Orphans;
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Erasure Column
|
||||
pub struct Coding;
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Data Column
|
||||
pub struct Data;
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Data Column
|
||||
pub struct DeadSlots;
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
use crate::blocktree::db::columns as cf;
|
||||
use crate::blocktree::db::{Backend, Column, DbCursor, IWriteBatch, TypedColumn, IteratorMode, IteratorDirection};
|
||||
use crate::blocktree::db::{
|
||||
Backend, Column, DbCursor, IWriteBatch, IteratorDirection, IteratorMode, TypedColumn,
|
||||
};
|
||||
use crate::blocktree::BlocktreeError;
|
||||
use crate::result::{Error, Result};
|
||||
use solana_sdk::timing::Slot;
|
||||
|
@ -7,8 +9,8 @@ use solana_sdk::timing::Slot;
|
|||
use byteorder::{BigEndian, ByteOrder};
|
||||
|
||||
use rocksdb::{
|
||||
self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, Direction, IteratorMode as RocksIteratorMode,
|
||||
Options, WriteBatch as RWriteBatch, DB,
|
||||
self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, Direction,
|
||||
IteratorMode as RocksIteratorMode, Options, WriteBatch as RWriteBatch, DB,
|
||||
};
|
||||
|
||||
use std::fs;
|
||||
|
@ -33,8 +35,7 @@ impl Backend for Rocks {
|
|||
|
||||
fn open(path: &Path) -> Result<Rocks> {
|
||||
use crate::blocktree::db::columns::{
|
||||
Coding, Data, DeadSlots, ErasureMeta, Index, Orphans, Root, ShredCode, ShredData,
|
||||
SlotMeta,
|
||||
DeadSlots, ErasureMeta, Index, Orphans, Root, ShredCode, ShredData, SlotMeta,
|
||||
};
|
||||
|
||||
fs::create_dir_all(&path)?;
|
||||
|
@ -45,12 +46,8 @@ impl Backend for Rocks {
|
|||
// Column family names
|
||||
let meta_cf_descriptor =
|
||||
ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options(SlotMeta::NAME));
|
||||
let data_cf_descriptor =
|
||||
ColumnFamilyDescriptor::new(Data::NAME, get_cf_options(Data::NAME));
|
||||
let dead_slots_cf_descriptor =
|
||||
ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options(DeadSlots::NAME));
|
||||
let erasure_cf_descriptor =
|
||||
ColumnFamilyDescriptor::new(Coding::NAME, get_cf_options(Coding::NAME));
|
||||
let erasure_meta_cf_descriptor =
|
||||
ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options(ErasureMeta::NAME));
|
||||
let orphans_cf_descriptor =
|
||||
|
@ -66,9 +63,7 @@ impl Backend for Rocks {
|
|||
|
||||
let cfs = vec![
|
||||
meta_cf_descriptor,
|
||||
data_cf_descriptor,
|
||||
dead_slots_cf_descriptor,
|
||||
erasure_cf_descriptor,
|
||||
erasure_meta_cf_descriptor,
|
||||
orphans_cf_descriptor,
|
||||
root_cf_descriptor,
|
||||
|
@ -85,15 +80,12 @@ impl Backend for Rocks {
|
|||
|
||||
fn columns(&self) -> Vec<&'static str> {
|
||||
use crate::blocktree::db::columns::{
|
||||
Coding, Data, DeadSlots, ErasureMeta, Index, Orphans, Root, ShredCode, ShredData,
|
||||
SlotMeta,
|
||||
DeadSlots, ErasureMeta, Index, Orphans, Root, ShredCode, ShredData, SlotMeta,
|
||||
};
|
||||
|
||||
vec![
|
||||
Coding::NAME,
|
||||
ErasureMeta::NAME,
|
||||
DeadSlots::NAME,
|
||||
Data::NAME,
|
||||
Index::NAME,
|
||||
Orphans::NAME,
|
||||
Root::NAME,
|
||||
|
@ -130,28 +122,24 @@ impl Backend for Rocks {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn iterator_cf(&self, cf: ColumnFamily, iterator_mode: IteratorMode<&[u8]>,) -> Result<DBIterator> {
|
||||
fn iterator_cf(
|
||||
&self,
|
||||
cf: ColumnFamily,
|
||||
iterator_mode: IteratorMode<&[u8]>,
|
||||
) -> Result<DBIterator> {
|
||||
let iter = {
|
||||
match iterator_mode {
|
||||
IteratorMode::Start => {
|
||||
self.0.iterator_cf(cf, RocksIteratorMode::Start)?
|
||||
}
|
||||
IteratorMode::End => {
|
||||
self.0.iterator_cf(cf, RocksIteratorMode::End)?
|
||||
}
|
||||
IteratorMode::From(start_from, direction) => {
|
||||
let rocks_direction = match direction {
|
||||
IteratorDirection::Forward => {
|
||||
Direction::Forward
|
||||
}
|
||||
IteratorDirection::Reverse => {
|
||||
Direction::Reverse
|
||||
}
|
||||
};
|
||||
self.0
|
||||
.iterator_cf(cf, RocksIteratorMode::From(start_from, rocks_direction))?
|
||||
}
|
||||
match iterator_mode {
|
||||
IteratorMode::Start => self.0.iterator_cf(cf, RocksIteratorMode::Start)?,
|
||||
IteratorMode::End => self.0.iterator_cf(cf, RocksIteratorMode::End)?,
|
||||
IteratorMode::From(start_from, direction) => {
|
||||
let rocks_direction = match direction {
|
||||
IteratorDirection::Forward => Direction::Forward,
|
||||
IteratorDirection::Reverse => Direction::Reverse,
|
||||
};
|
||||
self.0
|
||||
.iterator_cf(cf, RocksIteratorMode::From(start_from, rocks_direction))?
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Ok(iter)
|
||||
|
@ -173,53 +161,6 @@ impl Backend for Rocks {
|
|||
}
|
||||
}
|
||||
|
||||
impl Column<Rocks> for cf::Coding {
|
||||
const NAME: &'static str = super::ERASURE_CF;
|
||||
type Index = (u64, u64);
|
||||
|
||||
fn key(index: (u64, u64)) -> Vec<u8> {
|
||||
cf::Data::key(index)
|
||||
}
|
||||
|
||||
fn index(key: &[u8]) -> (u64, u64) {
|
||||
cf::Data::index(key)
|
||||
}
|
||||
|
||||
fn slot(index: Self::Index) -> Slot {
|
||||
index.0
|
||||
}
|
||||
|
||||
fn as_index(slot: Slot) -> Self::Index {
|
||||
(slot, 0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Column<Rocks> for cf::Data {
|
||||
const NAME: &'static str = super::DATA_CF;
|
||||
type Index = (u64, u64);
|
||||
|
||||
fn key((slot, index): (u64, u64)) -> Vec<u8> {
|
||||
let mut key = vec![0; 16];
|
||||
BigEndian::write_u64(&mut key[..8], slot);
|
||||
BigEndian::write_u64(&mut key[8..16], index);
|
||||
key
|
||||
}
|
||||
|
||||
fn index(key: &[u8]) -> (u64, u64) {
|
||||
let slot = BigEndian::read_u64(&key[..8]);
|
||||
let index = BigEndian::read_u64(&key[8..16]);
|
||||
(slot, index)
|
||||
}
|
||||
|
||||
fn slot(index: Self::Index) -> Slot {
|
||||
index.0
|
||||
}
|
||||
|
||||
fn as_index(slot: Slot) -> Self::Index {
|
||||
(slot, 0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Column<Rocks> for cf::ShredCode {
|
||||
const NAME: &'static str = super::CODE_SHRED_CF;
|
||||
type Index = (u64, u64);
|
||||
|
@ -478,11 +419,11 @@ impl std::convert::From<rocksdb::Error> for Error {
|
|||
}
|
||||
|
||||
fn get_cf_options(name: &'static str) -> Options {
|
||||
use crate::blocktree::db::columns::{Coding, Data, ShredCode, ShredData};
|
||||
use crate::blocktree::db::columns::{ShredCode, ShredData};
|
||||
|
||||
let mut options = Options::default();
|
||||
match name {
|
||||
Coding::NAME | Data::NAME | ShredCode::NAME | ShredData::NAME => {
|
||||
ShredCode::NAME | ShredData::NAME => {
|
||||
// 512MB * 8 = 4GB. 2 of these columns should take no more than 8GB of RAM
|
||||
options.set_max_write_buffer_number(8);
|
||||
options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE as usize);
|
||||
|
|
|
@ -19,7 +19,7 @@ use crate::crds_gossip::CrdsGossip;
|
|||
use crate::crds_gossip_error::CrdsGossipError;
|
||||
use crate::crds_gossip_pull::{CrdsFilter, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS};
|
||||
use crate::crds_value::{CrdsValue, CrdsValueLabel, EpochSlots, Vote};
|
||||
use crate::packet::{to_shared_blob, Packet, SharedBlob};
|
||||
use crate::packet::{to_shared_blob, Blob, Packet, SharedBlob};
|
||||
use crate::repair_service::RepairType;
|
||||
use crate::result::Result;
|
||||
use crate::staking_utils;
|
||||
|
@ -1035,6 +1035,15 @@ impl ClusterInfo {
|
|||
.unwrap()
|
||||
}
|
||||
|
||||
fn get_data_shred_as_blob(
|
||||
blocktree: &Arc<Blocktree>,
|
||||
slot: u64,
|
||||
shred_index: u64,
|
||||
) -> Result<Option<Blob>> {
|
||||
let bytes = blocktree.get_data_shred(slot, shred_index)?;
|
||||
Ok(bytes.map(|bytes| Blob::new(&bytes)))
|
||||
}
|
||||
|
||||
fn run_window_request(
|
||||
from: &ContactInfo,
|
||||
from_addr: &SocketAddr,
|
||||
|
@ -1045,7 +1054,7 @@ impl ClusterInfo {
|
|||
) -> Vec<SharedBlob> {
|
||||
if let Some(blocktree) = blocktree {
|
||||
// Try to find the requested index in one of the slots
|
||||
let blob = blocktree.get_data_shred_as_blob(slot, blob_index);
|
||||
let blob = Self::get_data_shred_as_blob(blocktree, slot, blob_index);
|
||||
|
||||
if let Ok(Some(mut blob)) = blob {
|
||||
inc_new_counter_debug!("cluster_info-window-request-ledger", 1);
|
||||
|
@ -1080,7 +1089,7 @@ impl ClusterInfo {
|
|||
if let Ok(Some(meta)) = meta {
|
||||
if meta.received > highest_index {
|
||||
// meta.received must be at least 1 by this point
|
||||
let blob = blocktree.get_data_shred_as_blob(slot, meta.received - 1);
|
||||
let blob = Self::get_data_shred_as_blob(blocktree, slot, meta.received - 1);
|
||||
|
||||
if let Ok(Some(mut blob)) = blob {
|
||||
blob.meta.set_addr(from_addr);
|
||||
|
@ -1106,7 +1115,7 @@ impl ClusterInfo {
|
|||
if meta.received == 0 {
|
||||
break;
|
||||
}
|
||||
let blob = blocktree.get_data_shred_as_blob(slot, meta.received - 1);
|
||||
let blob = Self::get_data_shred_as_blob(blocktree, slot, meta.received - 1);
|
||||
if let Ok(Some(mut blob)) = blob {
|
||||
blob.meta.set_addr(from_addr);
|
||||
res.push(Arc::new(RwLock::new(blob)));
|
||||
|
@ -2018,8 +2027,7 @@ mod tests {
|
|||
.rev()
|
||||
.map(|slot| {
|
||||
let index = blocktree.meta(slot).unwrap().unwrap().received - 1;
|
||||
blocktree
|
||||
.get_data_shred_as_blob(slot, index)
|
||||
ClusterInfo::get_data_shred_as_blob(&blocktree, slot, index)
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
})
|
||||
|
|
|
@ -346,7 +346,7 @@ impl ReplayStage {
|
|||
!Bank::can_commit(&tx_error)
|
||||
}
|
||||
Err(Error::BlobError(BlobError::VerificationFailed)) => true,
|
||||
Err(Error::BlocktreeError(BlocktreeError::InvalidBlobData(_))) => true,
|
||||
Err(Error::BlocktreeError(BlocktreeError::InvalidShredData(_))) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue