use crate::blockstore_meta; use bincode::{deserialize, serialize}; use byteorder::{BigEndian, ByteOrder}; use log::*; pub use rocksdb::Direction as IteratorDirection; use rocksdb::{ self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, DBRecoveryMode, IteratorMode as RocksIteratorMode, Options, WriteBatch as RWriteBatch, DB, }; use serde::de::DeserializeOwned; use serde::Serialize; use solana_runtime::hardened_unpack::UnpackError; use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature}; use solana_transaction_status::{Rewards, TransactionStatusMeta}; use std::{collections::HashMap, fs, marker::PhantomData, path::Path, sync::Arc}; use thiserror::Error; const MAX_WRITE_BUFFER_SIZE: u64 = 256 * 1024 * 1024; // 256MB // Column family for metadata about a leader slot const META_CF: &str = "meta"; // Column family for slots that have been marked as dead const DEAD_SLOTS_CF: &str = "dead_slots"; // Column family for storing proof that there were multiple // versions of a slot const DUPLICATE_SLOTS_CF: &str = "duplicate_slots"; // Column family storing erasure metadata for a slot const ERASURE_META_CF: &str = "erasure_meta"; // Column family for orphans data const ORPHANS_CF: &str = "orphans"; // Column family for root data const ROOT_CF: &str = "root"; /// Column family for indexes const INDEX_CF: &str = "index"; /// Column family for Data Shreds const DATA_SHRED_CF: &str = "data_shred"; /// Column family for Code Shreds const CODE_SHRED_CF: &str = "code_shred"; /// Column family for Transaction Status const TRANSACTION_STATUS_CF: &str = "transaction_status"; /// Column family for Address Signatures const ADDRESS_SIGNATURES_CF: &str = "address_signatures"; /// Column family for Transaction Status Index const TRANSACTION_STATUS_INDEX_CF: &str = "transaction_status_index"; /// Column family for Rewards const REWARDS_CF: &str = "rewards"; #[derive(Error, Debug)] pub enum BlockstoreError { ShredForIndexExists, InvalidShredData(Box), RocksDb(#[from] rocksdb::Error), SlotNotRooted, DeadSlot, IO(#[from] std::io::Error), Serialize(#[from] Box), FsExtraError(#[from] fs_extra::error::Error), SlotCleanedUp, UnpackError(#[from] UnpackError), UnableToSetOpenFileDescriptorLimit, TransactionStatusSlotMismatch, } pub type Result = std::result::Result; impl std::fmt::Display for BlockstoreError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "blockstore error") } } pub enum IteratorMode { Start, End, From(Index, IteratorDirection), } pub mod columns { #[derive(Debug)] /// The slot metadata column pub struct SlotMeta; #[derive(Debug)] /// The orphans column pub struct Orphans; #[derive(Debug)] /// The dead slots column pub struct DeadSlots; #[derive(Debug)] /// The duplicate slots column pub struct DuplicateSlots; #[derive(Debug)] /// The erasure meta column pub struct ErasureMeta; #[derive(Debug)] /// The root column pub struct Root; #[derive(Debug)] /// The index column pub struct Index; #[derive(Debug)] /// The shred data column pub struct ShredData; #[derive(Debug)] /// The shred erasure code column pub struct ShredCode; #[derive(Debug)] /// The transaction status column pub struct TransactionStatus; #[derive(Debug)] /// The address signatures column pub struct AddressSignatures; #[derive(Debug)] /// The transaction status index column pub struct TransactionStatusIndex; #[derive(Debug)] /// The rewards column pub struct Rewards; } pub enum AccessType { PrimaryOnly, TryPrimaryThenSecondary, } #[derive(Debug, PartialEq)] pub enum ActualAccessType { Primary, Secondary, } #[derive(Debug, Clone)] pub enum BlockstoreRecoveryMode { TolerateCorruptedTailRecords, AbsoluteConsistency, PointInTime, SkipAnyCorruptedRecord, } impl From<&str> for BlockstoreRecoveryMode { fn from(string: &str) -> Self { match string { "tolerate_corrupted_tail_records" => { BlockstoreRecoveryMode::TolerateCorruptedTailRecords } "absolute_consistency" => BlockstoreRecoveryMode::AbsoluteConsistency, "point_in_time" => BlockstoreRecoveryMode::PointInTime, "skip_any_corrupted_record" => BlockstoreRecoveryMode::SkipAnyCorruptedRecord, bad_mode => panic!("Invalid recovery mode: {}", bad_mode), } } } impl Into for BlockstoreRecoveryMode { fn into(self) -> DBRecoveryMode { match self { BlockstoreRecoveryMode::TolerateCorruptedTailRecords => { DBRecoveryMode::TolerateCorruptedTailRecords } BlockstoreRecoveryMode::AbsoluteConsistency => DBRecoveryMode::AbsoluteConsistency, BlockstoreRecoveryMode::PointInTime => DBRecoveryMode::PointInTime, BlockstoreRecoveryMode::SkipAnyCorruptedRecord => { DBRecoveryMode::SkipAnyCorruptedRecord } } } } #[derive(Debug)] struct Rocks(rocksdb::DB, ActualAccessType); impl Rocks { fn open( path: &Path, access_type: AccessType, recovery_mode: Option, ) -> Result { use columns::{ AddressSignatures, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus, TransactionStatusIndex, }; fs::create_dir_all(&path)?; // Use default database options let mut db_options = get_db_options(); if let Some(recovery_mode) = recovery_mode { db_options.set_wal_recovery_mode(recovery_mode.into()); } // Column family names let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options()); let dead_slots_cf_descriptor = ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options()); let duplicate_slots_cf_descriptor = ColumnFamilyDescriptor::new(DuplicateSlots::NAME, get_cf_options()); let erasure_meta_cf_descriptor = ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options()); let orphans_cf_descriptor = ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options()); let root_cf_descriptor = ColumnFamilyDescriptor::new(Root::NAME, get_cf_options()); let index_cf_descriptor = ColumnFamilyDescriptor::new(Index::NAME, get_cf_options()); let shred_data_cf_descriptor = ColumnFamilyDescriptor::new(ShredData::NAME, get_cf_options()); let shred_code_cf_descriptor = ColumnFamilyDescriptor::new(ShredCode::NAME, get_cf_options()); let transaction_status_cf_descriptor = ColumnFamilyDescriptor::new(TransactionStatus::NAME, get_cf_options()); let address_signatures_cf_descriptor = ColumnFamilyDescriptor::new(AddressSignatures::NAME, get_cf_options()); let transaction_status_index_cf_descriptor = ColumnFamilyDescriptor::new(TransactionStatusIndex::NAME, get_cf_options()); let rewards_cf_descriptor = ColumnFamilyDescriptor::new(Rewards::NAME, get_cf_options()); let cfs = vec![ (SlotMeta::NAME, meta_cf_descriptor), (DeadSlots::NAME, dead_slots_cf_descriptor), (DuplicateSlots::NAME, duplicate_slots_cf_descriptor), (ErasureMeta::NAME, erasure_meta_cf_descriptor), (Orphans::NAME, orphans_cf_descriptor), (Root::NAME, root_cf_descriptor), (Index::NAME, index_cf_descriptor), (ShredData::NAME, shred_data_cf_descriptor), (ShredCode::NAME, shred_code_cf_descriptor), (TransactionStatus::NAME, transaction_status_cf_descriptor), (AddressSignatures::NAME, address_signatures_cf_descriptor), ( TransactionStatusIndex::NAME, transaction_status_index_cf_descriptor, ), (Rewards::NAME, rewards_cf_descriptor), ]; // Open the database let db = match access_type { AccessType::PrimaryOnly => Rocks( DB::open_cf_descriptors(&db_options, path, cfs.into_iter().map(|c| c.1))?, ActualAccessType::Primary, ), AccessType::TryPrimaryThenSecondary => { let names: Vec<_> = cfs.iter().map(|c| c.0).collect(); match DB::open_cf_descriptors(&db_options, path, cfs.into_iter().map(|c| c.1)) { Ok(db) => Rocks(db, ActualAccessType::Primary), Err(err) => { let secondary_path = path.join("solana-secondary"); warn!("Error when opening as primary: {}", err); warn!("Trying as secondary at : {:?}", secondary_path); warn!("This active secondary db use may temporarily cause the performance of another db use (like by validator) to degrade"); // This is needed according to https://github.com/facebook/rocksdb/wiki/Secondary-instance db_options.set_max_open_files(-1); Rocks( DB::open_cf_as_secondary(&db_options, path, &secondary_path, names)?, ActualAccessType::Secondary, ) } } } }; Ok(db) } fn columns(&self) -> Vec<&'static str> { use columns::{ AddressSignatures, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus, TransactionStatusIndex, }; vec![ ErasureMeta::NAME, DeadSlots::NAME, DuplicateSlots::NAME, Index::NAME, Orphans::NAME, Root::NAME, SlotMeta::NAME, ShredData::NAME, ShredCode::NAME, TransactionStatus::NAME, AddressSignatures::NAME, TransactionStatusIndex::NAME, Rewards::NAME, ] } fn destroy(path: &Path) -> Result<()> { DB::destroy(&Options::default(), path)?; Ok(()) } fn cf_handle(&self, cf: &str) -> &ColumnFamily { self.0 .cf_handle(cf) .expect("should never get an unknown column") } fn get_cf(&self, cf: &ColumnFamily, key: &[u8]) -> Result>> { let opt = self.0.get_cf(cf, key)?.map(|db_vec| db_vec.to_vec()); Ok(opt) } fn put_cf(&self, cf: &ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> { self.0.put_cf(cf, key, value)?; Ok(()) } fn iterator_cf( &self, cf: &ColumnFamily, iterator_mode: IteratorMode, ) -> Result where C: Column, { let start_key; let iterator_mode = match iterator_mode { IteratorMode::From(start_from, direction) => { start_key = C::key(start_from); RocksIteratorMode::From(&start_key, direction) } IteratorMode::Start => RocksIteratorMode::Start, IteratorMode::End => RocksIteratorMode::End, }; let iter = self.0.iterator_cf(cf, iterator_mode); Ok(iter) } fn raw_iterator_cf(&self, cf: &ColumnFamily) -> Result { let raw_iter = self.0.raw_iterator_cf(cf); Ok(raw_iter) } fn batch(&self) -> Result { Ok(RWriteBatch::default()) } fn write(&self, batch: RWriteBatch) -> Result<()> { self.0.write(batch)?; Ok(()) } fn is_primary_access(&self) -> bool { self.1 == ActualAccessType::Primary } } pub trait Column { type Index; fn key_size() -> usize { std::mem::size_of::() } fn key(index: Self::Index) -> Vec; fn index(key: &[u8]) -> Self::Index; fn primary_index(index: Self::Index) -> Slot; fn as_index(slot: Slot) -> Self::Index; } pub trait ColumnName { const NAME: &'static str; } pub trait TypedColumn: Column { type Type: Serialize + DeserializeOwned; } impl TypedColumn for columns::TransactionStatus { type Type = TransactionStatusMeta; } impl TypedColumn for columns::AddressSignatures { type Type = blockstore_meta::AddressSignatureMeta; } impl TypedColumn for columns::TransactionStatusIndex { type Type = blockstore_meta::TransactionStatusIndexMeta; } pub trait SlotColumn {} impl Column for T { type Index = u64; fn key(slot: u64) -> Vec { let mut key = vec![0; 8]; BigEndian::write_u64(&mut key[..], slot); key } fn index(key: &[u8]) -> u64 { BigEndian::read_u64(&key[..8]) } fn primary_index(index: u64) -> Slot { index } fn as_index(slot: Slot) -> u64 { slot } } impl Column for columns::TransactionStatus { type Index = (u64, Signature, Slot); fn key((index, signature, slot): (u64, Signature, Slot)) -> Vec { let mut key = vec![0; 8 + 64 + 8]; // size_of u64 + size_of Signature + size_of Slot BigEndian::write_u64(&mut key[0..8], index); key[8..72].clone_from_slice(&signature.as_ref()[0..64]); BigEndian::write_u64(&mut key[72..80], slot); key } fn index(key: &[u8]) -> (u64, Signature, Slot) { if key.len() != 80 { Self::as_index(0) } else { let index = BigEndian::read_u64(&key[0..8]); let signature = Signature::new(&key[8..72]); let slot = BigEndian::read_u64(&key[72..80]); (index, signature, slot) } } fn primary_index(index: Self::Index) -> u64 { index.0 } fn as_index(index: u64) -> Self::Index { (index, Signature::default(), 0) } } impl ColumnName for columns::TransactionStatus { const NAME: &'static str = TRANSACTION_STATUS_CF; } impl Column for columns::AddressSignatures { type Index = (u64, Pubkey, Slot, Signature); fn key((index, pubkey, slot, signature): (u64, Pubkey, Slot, Signature)) -> Vec { let mut key = vec![0; 8 + 32 + 8 + 64]; // size_of u64 + size_of Pubkey + size_of Slot + size_of Signature BigEndian::write_u64(&mut key[0..8], index); key[8..40].clone_from_slice(&pubkey.as_ref()[0..32]); BigEndian::write_u64(&mut key[40..48], slot); key[48..112].clone_from_slice(&signature.as_ref()[0..64]); key } fn index(key: &[u8]) -> (u64, Pubkey, Slot, Signature) { let index = BigEndian::read_u64(&key[0..8]); let pubkey = Pubkey::new(&key[8..40]); let slot = BigEndian::read_u64(&key[40..48]); let signature = Signature::new(&key[48..112]); (index, pubkey, slot, signature) } fn primary_index(index: Self::Index) -> u64 { index.0 } fn as_index(index: u64) -> Self::Index { (index, Pubkey::default(), 0, Signature::default()) } } impl ColumnName for columns::AddressSignatures { const NAME: &'static str = ADDRESS_SIGNATURES_CF; } impl Column for columns::TransactionStatusIndex { type Index = u64; fn key(index: u64) -> Vec { let mut key = vec![0; 8]; BigEndian::write_u64(&mut key[..], index); key } fn index(key: &[u8]) -> u64 { BigEndian::read_u64(&key[..8]) } fn primary_index(index: u64) -> u64 { index } fn as_index(slot: u64) -> u64 { slot } } impl ColumnName for columns::TransactionStatusIndex { const NAME: &'static str = TRANSACTION_STATUS_INDEX_CF; } impl SlotColumn for columns::Rewards {} impl ColumnName for columns::Rewards { const NAME: &'static str = REWARDS_CF; } impl TypedColumn for columns::Rewards { type Type = Rewards; } impl Column for columns::ShredCode { type Index = (u64, u64); fn key(index: (u64, u64)) -> Vec { columns::ShredData::key(index) } fn index(key: &[u8]) -> (u64, u64) { columns::ShredData::index(key) } fn primary_index(index: Self::Index) -> Slot { index.0 } fn as_index(slot: Slot) -> Self::Index { (slot, 0) } } impl ColumnName for columns::ShredCode { const NAME: &'static str = CODE_SHRED_CF; } impl Column for columns::ShredData { type Index = (u64, u64); fn key((slot, index): (u64, u64)) -> Vec { let mut key = vec![0; 16]; BigEndian::write_u64(&mut key[..8], slot); BigEndian::write_u64(&mut key[8..16], index); key } fn index(key: &[u8]) -> (u64, u64) { let slot = BigEndian::read_u64(&key[..8]); let index = BigEndian::read_u64(&key[8..16]); (slot, index) } fn primary_index(index: Self::Index) -> Slot { index.0 } fn as_index(slot: Slot) -> Self::Index { (slot, 0) } } impl ColumnName for columns::ShredData { const NAME: &'static str = DATA_SHRED_CF; } impl SlotColumn for columns::Index {} impl ColumnName for columns::Index { const NAME: &'static str = INDEX_CF; } impl TypedColumn for columns::Index { type Type = blockstore_meta::Index; } impl SlotColumn for columns::DeadSlots {} impl ColumnName for columns::DeadSlots { const NAME: &'static str = DEAD_SLOTS_CF; } impl TypedColumn for columns::DeadSlots { type Type = bool; } impl SlotColumn for columns::DuplicateSlots {} impl ColumnName for columns::DuplicateSlots { const NAME: &'static str = DUPLICATE_SLOTS_CF; } impl TypedColumn for columns::DuplicateSlots { type Type = blockstore_meta::DuplicateSlotProof; } impl SlotColumn for columns::Orphans {} impl ColumnName for columns::Orphans { const NAME: &'static str = ORPHANS_CF; } impl TypedColumn for columns::Orphans { type Type = bool; } impl SlotColumn for columns::Root {} impl ColumnName for columns::Root { const NAME: &'static str = ROOT_CF; } impl TypedColumn for columns::Root { type Type = bool; } impl SlotColumn for columns::SlotMeta {} impl ColumnName for columns::SlotMeta { const NAME: &'static str = META_CF; } impl TypedColumn for columns::SlotMeta { type Type = blockstore_meta::SlotMeta; } impl Column for columns::ErasureMeta { type Index = (u64, u64); fn index(key: &[u8]) -> (u64, u64) { let slot = BigEndian::read_u64(&key[..8]); let set_index = BigEndian::read_u64(&key[8..]); (slot, set_index) } fn key((slot, set_index): (u64, u64)) -> Vec { let mut key = vec![0; 16]; BigEndian::write_u64(&mut key[..8], slot); BigEndian::write_u64(&mut key[8..], set_index); key } fn primary_index(index: Self::Index) -> Slot { index.0 } fn as_index(slot: Slot) -> Self::Index { (slot, 0) } } impl ColumnName for columns::ErasureMeta { const NAME: &'static str = ERASURE_META_CF; } impl TypedColumn for columns::ErasureMeta { type Type = blockstore_meta::ErasureMeta; } #[derive(Debug, Clone)] pub struct Database { backend: Arc, path: Arc, } #[derive(Debug, Clone)] pub struct LedgerColumn where C: Column, { backend: Arc, column: PhantomData, } pub struct WriteBatch<'a> { write_batch: RWriteBatch, map: HashMap<&'static str, &'a ColumnFamily>, } impl Database { pub fn open( path: &Path, access_type: AccessType, recovery_mode: Option, ) -> Result { let backend = Arc::new(Rocks::open(path, access_type, recovery_mode)?); Ok(Database { backend, path: Arc::from(path), }) } pub fn destroy(path: &Path) -> Result<()> { Rocks::destroy(path)?; Ok(()) } pub fn get(&self, key: C::Index) -> Result> where C: TypedColumn + ColumnName, { if let Some(serialized_value) = self.backend.get_cf(self.cf_handle::(), &C::key(key))? { let value = deserialize(&serialized_value)?; Ok(Some(value)) } else { Ok(None) } } pub fn iter<'a, C>( &'a self, iterator_mode: IteratorMode, ) -> Result)> + 'a> where C: Column + ColumnName, { let cf = self.cf_handle::(); let iter = self.backend.iterator_cf::(cf, iterator_mode)?; Ok(iter.map(|(key, value)| (C::index(&key), value))) } #[inline] pub fn cf_handle(&self) -> &ColumnFamily where C: Column + ColumnName, { self.backend.cf_handle(C::NAME) } pub fn column(&self) -> LedgerColumn where C: Column + ColumnName, { LedgerColumn { backend: Arc::clone(&self.backend), column: PhantomData, } } #[inline] pub fn raw_iterator_cf(&self, cf: &ColumnFamily) -> Result { self.backend.raw_iterator_cf(cf) } pub fn batch(&self) -> Result { let write_batch = self.backend.batch()?; let map = self .backend .columns() .into_iter() .map(|desc| (desc, self.backend.cf_handle(desc))) .collect(); Ok(WriteBatch { write_batch, map }) } pub fn write(&self, batch: WriteBatch) -> Result<()> { self.backend.write(batch.write_batch) } pub fn storage_size(&self) -> Result { Ok(fs_extra::dir::get_size(&self.path)?) } // Adds a range to delete to the given write batch pub fn delete_range_cf(&self, batch: &mut WriteBatch, from: Slot, to: Slot) -> Result<()> where C: Column + ColumnName, { let cf = self.cf_handle::(); let from_index = C::as_index(from); let to_index = C::as_index(to); batch.delete_range_cf::(cf, from_index, to_index) } pub fn is_primary_access(&self) -> bool { self.backend.is_primary_access() } } impl LedgerColumn where C: Column + ColumnName, { pub fn get_bytes(&self, key: C::Index) -> Result>> { self.backend.get_cf(self.handle(), &C::key(key)) } pub fn iter<'a>( &'a self, iterator_mode: IteratorMode, ) -> Result)> + 'a> { let cf = self.handle(); let iter = self.backend.iterator_cf::(cf, iterator_mode)?; Ok(iter.map(|(key, value)| (C::index(&key), value))) } pub fn delete_slot( &self, batch: &mut WriteBatch, from: Option, to: Option, ) -> Result where C::Index: PartialOrd + Copy + ColumnName, { let mut end = true; let iter_config = match from { Some(s) => IteratorMode::From(C::as_index(s), IteratorDirection::Forward), None => IteratorMode::Start, }; let iter = self.iter(iter_config)?; for (index, _) in iter { if let Some(to) = to { if C::primary_index(index) > to { end = false; break; } }; if let Err(e) = batch.delete::(index) { error!( "Error: {:?} while adding delete from_slot {:?} to batch {:?}", e, from, C::NAME ) } } Ok(end) } pub fn compact_range(&self, from: Slot, to: Slot) -> Result where C::Index: PartialOrd + Copy, { let cf = self.handle(); let from = Some(C::key(C::as_index(from))); let to = Some(C::key(C::as_index(to))); self.backend.0.compact_range_cf(cf, from, to); Ok(true) } #[inline] pub fn handle(&self) -> &ColumnFamily { self.backend.cf_handle(C::NAME) } #[cfg(test)] pub fn is_empty(&self) -> Result { let mut iter = self.backend.raw_iterator_cf(self.handle())?; iter.seek_to_first(); Ok(!iter.valid()) } pub fn put_bytes(&self, key: C::Index, value: &[u8]) -> Result<()> { self.backend.put_cf(self.handle(), &C::key(key), value) } } impl LedgerColumn where C: TypedColumn + ColumnName, { pub fn get(&self, key: C::Index) -> Result> { if let Some(serialized_value) = self.backend.get_cf(self.handle(), &C::key(key))? { let value = deserialize(&serialized_value)?; Ok(Some(value)) } else { Ok(None) } } pub fn put(&self, key: C::Index, value: &C::Type) -> Result<()> { let serialized_value = serialize(value)?; self.backend .put_cf(self.handle(), &C::key(key), &serialized_value) } } impl<'a> WriteBatch<'a> { pub fn put_bytes(&mut self, key: C::Index, bytes: &[u8]) -> Result<()> { self.write_batch .put_cf(self.get_cf::(), &C::key(key), bytes); Ok(()) } pub fn delete(&mut self, key: C::Index) -> Result<()> { self.write_batch.delete_cf(self.get_cf::(), &C::key(key)); Ok(()) } pub fn put( &mut self, key: C::Index, value: &C::Type, ) -> Result<()> { let serialized_value = serialize(&value)?; self.write_batch .put_cf(self.get_cf::(), &C::key(key), &serialized_value); Ok(()) } #[inline] fn get_cf(&self) -> &'a ColumnFamily { self.map[C::NAME] } pub fn delete_range_cf( &mut self, cf: &ColumnFamily, from: C::Index, to: C::Index, ) -> Result<()> { self.write_batch .delete_range_cf(cf, C::key(from), C::key(to)); Ok(()) } } fn get_cf_options() -> Options { let mut options = Options::default(); // 256 * 8 = 2GB. 6 of these columns should take at most 12GB of RAM options.set_max_write_buffer_number(8); options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE as usize); let file_num_compaction_trigger = 4; // Recommend that this be around the size of level 0. Level 0 estimated size in stable state is // write_buffer_size * min_write_buffer_number_to_merge * level0_file_num_compaction_trigger // Source: https://docs.rs/rocksdb/0.6.0/rocksdb/struct.Options.html#method.set_level_zero_file_num_compaction_trigger let total_size_base = MAX_WRITE_BUFFER_SIZE * file_num_compaction_trigger; let file_size_base = total_size_base / 10; options.set_level_zero_file_num_compaction_trigger(file_num_compaction_trigger as i32); options.set_max_bytes_for_level_base(total_size_base); options.set_target_file_size_base(file_size_base); options } fn get_db_options() -> Options { let mut options = Options::default(); options.create_if_missing(true); options.create_missing_column_families(true); // A good value for this is the number of cores on the machine options.increase_parallelism(num_cpus::get() as i32); // Set max total wal size to 4G. options.set_max_total_wal_size(4 * 1024 * 1024 * 1024); options }