2. refactor(state): move all RocksDB API calls to the disk_db module (#3578)

* refactor(state): move RocksDB-specific initialization to a new module

* refactor(state): move RocksDB-specific shutdown to a new module

* refactor(state): temporarily allow RocksDB-specific reads and writes, without a new module

Unlike the last few commits, this one actually compiles.

* refactor(state): add a DiskWriteBatch wrapper for RocksDB writes

* refactor(state): move finalized state test methods to a test module
This commit is contained in:
teor 2022-02-22 22:59:44 +10:00 committed by GitHub
parent 8e36686cc3
commit 32017f992b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 433 additions and 334 deletions

View File

@ -1,8 +1,6 @@
use std::{convert::TryInto, path::PathBuf};
use std::path::PathBuf;
use rlimit::increase_nofile_limit;
use serde::{Deserialize, Serialize};
use tracing::{info, warn};
use zebra_chain::parameters::Network;
@ -57,34 +55,14 @@ fn gen_temp_path(prefix: &str) -> PathBuf {
}
impl Config {
/// The ideal open file limit for Zebra
const IDEAL_OPEN_FILE_LIMIT: u64 = 1024;
/// The minimum number of open files for Zebra to operate normally. Also used
/// as the default open file limit, when the OS doesn't tell us how many
/// files we can use.
///
/// We want 100+ file descriptors for peers, and 100+ for the database.
///
/// On Windows, the default limit is 512 high-level I/O files, and 8192
/// low-level I/O files:
/// https://docs.microsoft.com/en-us/cpp/c-runtime-library/reference/setmaxstdio?view=msvc-160#remarks
const MIN_OPEN_FILE_LIMIT: u64 = 512;
/// The number of files used internally by Zebra.
///
/// Zebra uses file descriptors for OS libraries (10+), polling APIs (10+),
/// stdio (3), and other OS facilities (2+).
const RESERVED_FILE_COUNT: u64 = 48;
/// Returns the path and database options for the finalized state database
pub(crate) fn db_config(&self, network: Network) -> (PathBuf, rocksdb::Options) {
/// Returns the path for the finalized state database
pub(crate) fn db_path(&self, network: Network) -> PathBuf {
let net_dir = match network {
Network::Mainnet => "mainnet",
Network::Testnet => "testnet",
};
let path = if self.ephemeral {
if self.ephemeral {
gen_temp_path(&format!(
"zebra-state-v{}-{}",
crate::constants::DATABASE_FORMAT_VERSION,
@ -95,25 +73,7 @@ impl Config {
.join("state")
.join(format!("v{}", crate::constants::DATABASE_FORMAT_VERSION))
.join(net_dir)
};
let mut opts = rocksdb::Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
let open_file_limit = Config::increase_open_file_limit();
let db_file_limit = Config::get_db_open_file_limit(open_file_limit);
// If the current limit is very large, set the DB limit using the ideal limit
let ideal_limit = Config::get_db_open_file_limit(Config::IDEAL_OPEN_FILE_LIMIT)
.try_into()
.expect("ideal open file limit fits in a c_int");
let db_file_limit = db_file_limit.try_into().unwrap_or(ideal_limit);
opts.set_max_open_files(db_file_limit);
(path, opts)
}
}
/// Construct a config for an ephemeral database
@ -123,92 +83,6 @@ impl Config {
..Config::default()
}
}
/// Calculate the database's share of `open_file_limit`
fn get_db_open_file_limit(open_file_limit: u64) -> u64 {
// Give the DB half the files, and reserve half the files for peers
(open_file_limit - Config::RESERVED_FILE_COUNT) / 2
}
/// Increase the open file limit for this process to `IDEAL_OPEN_FILE_LIMIT`.
/// If that fails, try `MIN_OPEN_FILE_LIMIT`.
///
/// If the current limit is above `IDEAL_OPEN_FILE_LIMIT`, leaves it
/// unchanged.
///
/// Returns the current limit, after any successful increases.
///
/// # Panics
///
/// If the open file limit can not be increased to `MIN_OPEN_FILE_LIMIT`.
fn increase_open_file_limit() -> u64 {
// `increase_nofile_limit` doesn't do anything on Windows in rlimit 0.7.0.
//
// On Windows, the default limit is:
// - 512 high-level stream I/O files (via the C standard functions), and
// - 8192 low-level I/O files (via the Unix C functions).
// https://docs.microsoft.com/en-us/cpp/c-runtime-library/reference/setmaxstdio?view=msvc-160#remarks
//
// If we need more high-level I/O files on Windows,
// use `setmaxstdio` and `getmaxstdio` from the `rlimit` crate:
// https://docs.rs/rlimit/latest/rlimit/#windows
//
// Then panic if `setmaxstdio` fails to set the minimum value,
// and `getmaxstdio` is below the minimum value.
// We try setting the ideal limit, then the minimum limit.
let current_limit = match increase_nofile_limit(Config::IDEAL_OPEN_FILE_LIMIT) {
Ok(current_limit) => current_limit,
Err(limit_error) => {
info!(
?limit_error,
min_limit = ?Config::MIN_OPEN_FILE_LIMIT,
ideal_limit = ?Config::IDEAL_OPEN_FILE_LIMIT,
"unable to increase the open file limit, \
assuming Zebra can open a minimum number of files"
);
return Config::MIN_OPEN_FILE_LIMIT;
}
};
if current_limit < Config::MIN_OPEN_FILE_LIMIT {
panic!(
"open file limit too low: \
unable to set the number of open files to {}, \
the minimum number of files required by Zebra. \
Current limit is {:?}. \
Hint: Increase the open file limit to {} before launching Zebra",
Config::MIN_OPEN_FILE_LIMIT,
current_limit,
Config::IDEAL_OPEN_FILE_LIMIT
);
} else if current_limit < Config::IDEAL_OPEN_FILE_LIMIT {
warn!(
?current_limit,
min_limit = ?Config::MIN_OPEN_FILE_LIMIT,
ideal_limit = ?Config::IDEAL_OPEN_FILE_LIMIT,
"the maximum number of open files is below Zebra's ideal limit. \
Hint: Increase the open file limit to {} before launching Zebra",
Config::IDEAL_OPEN_FILE_LIMIT
);
} else if cfg!(windows) {
info!(
min_limit = ?Config::MIN_OPEN_FILE_LIMIT,
ideal_limit = ?Config::IDEAL_OPEN_FILE_LIMIT,
"assuming the open file limit is high enough for Zebra",
);
} else {
info!(
?current_limit,
min_limit = ?Config::MIN_OPEN_FILE_LIMIT,
ideal_limit = ?Config::IDEAL_OPEN_FILE_LIMIT,
"the open file limit is high enough for Zebra",
);
}
current_limit
}
}
impl Default for Config {

View File

@ -12,6 +12,9 @@
#![doc(html_logo_url = "https://www.zfnd.org/images/zebra-icon.png")]
#![doc(html_root_url = "https://doc.zebra.zfnd.org/zebra_state")]
#[macro_use]
extern crate tracing;
#[cfg(any(test, feature = "proptest-impl"))]
mod arbitrary;
mod config;

View File

@ -1,4 +1,9 @@
//! The primary implementation of the `zebra_state::Service` built upon rocksdb
//!
//! # Correctness
//!
//! The [`crate::constants::DATABASE_FORMAT_VERSION`] constant must
//! be incremented each time the database format (column, serialization, etc) changes.
use std::{
borrow::Borrow,
@ -25,8 +30,8 @@ use crate::{
service::{
check,
finalized_state::{
disk_db::{ReadDisk, WriteDisk},
disk_format::{FromDisk, IntoDisk, TransactionLocation},
disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk},
disk_format::{FromDisk, TransactionLocation},
},
QueuedFinalized,
},
@ -44,87 +49,44 @@ mod tests;
/// The finalized part of the chain state, stored in the db.
pub struct FinalizedState {
/// The underlying database.
db: DiskDb,
/// Queued blocks that arrived out of order, indexed by their parent block hash.
queued_by_prev_hash: HashMap<block::Hash, QueuedFinalized>,
/// A metric tracking the maximum height that's currently in `queued_by_prev_hash`
///
/// Set to `f64::NAN` if `queued_by_prev_hash` is empty, because grafana shows NaNs
/// as a break in the graph.
max_queued_height: f64,
db: rocksdb::DB,
ephemeral: bool,
/// The configured stop height.
///
/// Commit blocks to the finalized state up to this height, then exit Zebra.
debug_stop_at_height: Option<block::Height>,
/// The configured network.
network: Network,
}
impl FinalizedState {
pub fn new(config: &Config, network: Network) -> Self {
let (path, db_options) = config.db_config(network);
// Note: The [`crate::constants::DATABASE_FORMAT_VERSION`] constant must
// be incremented each time the database format (column, serialization, etc) changes.
let column_families = vec![
rocksdb::ColumnFamilyDescriptor::new("hash_by_height", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("height_by_hash", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("block_by_height", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("tx_by_hash", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("utxo_by_outpoint", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("sprout_nullifiers", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("sapling_nullifiers", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("orchard_nullifiers", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("sprout_anchors", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("sapling_anchors", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("orchard_anchors", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("sprout_note_commitment_tree", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new(
"sapling_note_commitment_tree",
db_options.clone(),
),
rocksdb::ColumnFamilyDescriptor::new(
"orchard_note_commitment_tree",
db_options.clone(),
),
rocksdb::ColumnFamilyDescriptor::new("history_tree", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("tip_chain_value_pool", db_options.clone()),
];
let db_result = rocksdb::DB::open_cf_descriptors(&db_options, &path, column_families);
let db = match db_result {
Ok(d) => {
tracing::info!("Opened Zebra state cache at {}", path.display());
d
}
// TODO: provide a different hint if the disk is full, see #1623
Err(e) => panic!(
"Opening database {:?} failed: {:?}. \
Hint: Check if another zebrad process is running. \
Try changing the state cache_dir in the Zebra config.",
path, e,
),
};
let db = DiskDb::new(config, network);
let new_state = Self {
queued_by_prev_hash: HashMap::new(),
max_queued_height: f64::NAN,
db,
ephemeral: config.ephemeral,
debug_stop_at_height: config.debug_stop_at_height.map(block::Height),
network,
};
// TODO: remove these extra logs once bugs like #2905 are fixed
tracing::info!("reading cached tip height");
if let Some(tip_height) = new_state.finalized_tip_height() {
tracing::info!(?tip_height, "loaded cached tip height");
if new_state.is_at_stop_height(tip_height) {
let debug_stop_at_height = new_state
.debug_stop_at_height
.expect("true from `is_at_stop_height` implies `debug_stop_at_height` is Some");
tracing::info!("reading cached tip hash");
let tip_hash = new_state.finalized_tip_hash();
if tip_height > debug_stop_at_height {
@ -145,7 +107,6 @@ impl FinalizedState {
// RocksDB can do a cleanup when column families are opened.
// So we want to drop it before we exit.
tracing::info!("closing cached state");
std::mem::drop(new_state);
Self::exit_process();
@ -232,14 +193,6 @@ impl FinalizedState {
self.tip().map(|(height, _)| height)
}
fn is_empty(&self, cf: &rocksdb::ColumnFamily) -> bool {
// use iterator to check if it's empty
!self
.db
.iterator_cf(cf, rocksdb::IteratorMode::Start)
.valid()
}
/// Immediately commit `finalized` to the finalized state.
///
/// This can be called either by the non-finalized state (when finalizing
@ -285,7 +238,7 @@ impl FinalizedState {
let tip_chain_value_pool = self.db.cf_handle("tip_chain_value_pool").unwrap();
// Assert that callers (including unit tests) get the chain order correct
if self.is_empty(hash_by_height) {
if self.db.is_empty(hash_by_height) {
assert_eq!(
GENESIS_PREVIOUS_BLOCK_HASH, finalized.block.header.previous_block_hash,
"the first block added to an empty state must be a genesis block, source: {}",
@ -346,8 +299,8 @@ impl FinalizedState {
// the genesis case.
// If the closure returns an error it will be propagated and the batch will not be written
// to the BD afterwards.
let prepare_commit = || -> Result<rocksdb::WriteBatch, BoxError> {
let mut batch = rocksdb::WriteBatch::default();
let prepare_commit = || -> Result<DiskWriteBatch, BoxError> {
let mut batch = DiskWriteBatch::new();
// Index the block
batch.zs_insert(hash_by_height, height, hash);
@ -413,7 +366,7 @@ impl FinalizedState {
if let Some(utxo) = self.utxo(outpoint) {
all_utxos_spent_by_block.insert(*outpoint, utxo);
}
batch.delete_cf(utxo_by_outpoint, outpoint.as_bytes());
batch.zs_delete(utxo_by_outpoint, outpoint);
}
// Coinbase inputs represent new coins,
// so there are no UTXOs to mark as spent.
@ -505,6 +458,7 @@ impl FinalizedState {
tracing::trace!(?source, "committed block from");
// TODO: move the stop height check to the syncer (#3442)
if result.is_ok() && self.is_at_stop_height(height) {
tracing::info!(?source, "committed block from");
tracing::info!(
@ -513,9 +467,8 @@ impl FinalizedState {
"stopping at configured height, flushing database to disk"
);
self.shutdown();
self.db.shutdown();
// TODO: replace with a graceful shutdown (#1678)
Self::exit_process();
}
@ -525,7 +478,8 @@ impl FinalizedState {
/// Exit the host process.
///
/// Designed for debugging and tests.
/// TODO: replace with a graceful shutdown (#1678)
///
/// TODO: move the stop height check to the syncer (#3442)
fn exit_process() -> ! {
tracing::info!("exiting Zebra");
@ -582,7 +536,7 @@ impl FinalizedState {
pub fn tip(&self) -> Option<(block::Height, block::Hash)> {
let hash_by_height = self.db.cf_handle("hash_by_height").unwrap();
self.db
.iterator_cf(hash_by_height, rocksdb::IteratorMode::End)
.reverse_iterator(hash_by_height)
.next()
.map(|(height_bytes, hash_bytes)| {
let height = block::Height::from_bytes(height_bytes);
@ -754,32 +708,6 @@ impl FinalizedState {
}
}
/// If the database is `ephemeral`, delete it.
fn delete_ephemeral(&self) {
if self.ephemeral {
let path = self.db.path();
tracing::info!(cache_path = ?path, "removing temporary database files");
// We'd like to use `rocksdb::Env::mem_env` for ephemeral databases,
// but the Zcash blockchain might not fit in memory. So we just
// delete the database files instead.
//
// We'd like to call `DB::destroy` here, but calling destroy on a
// live DB is undefined behaviour:
// https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ#basic-readwrite
//
// So we assume that all the database files are under `path`, and
// delete them using standard filesystem APIs. Deleting open files
// might cause errors on non-Unix platforms, so we ignore the result.
// (The OS will delete them eventually anyway.)
let res = std::fs::remove_dir_all(path);
// TODO: downgrade to debug once bugs like #2905 are fixed
// but leave any errors at "info" level
tracing::info!(?res, "removed temporary database files");
}
}
/// Returns the `Path` where the files used by this database are located.
#[allow(dead_code)]
pub fn path(&self) -> &Path {
@ -793,104 +721,6 @@ impl FinalizedState {
.zs_get(value_pool_cf, &())
.unwrap_or_else(ValueBalance::zero)
}
/// Allow to set up a fake value pool in the database for testing purposes.
#[cfg(any(test, feature = "proptest-impl"))]
#[allow(dead_code)]
pub fn set_current_value_pool(&self, fake_value_pool: ValueBalance<NonNegative>) {
let mut batch = rocksdb::WriteBatch::default();
let value_pool_cf = self.db.cf_handle("tip_chain_value_pool").unwrap();
batch.zs_insert(value_pool_cf, (), fake_value_pool);
self.db.write(batch).unwrap();
}
/// Artificially prime the note commitment tree anchor sets with anchors
/// referenced in a block, for testing purposes _only_.
#[cfg(test)]
pub fn populate_with_anchors(&self, block: &Block) {
let mut batch = rocksdb::WriteBatch::default();
let sprout_anchors = self.db.cf_handle("sprout_anchors").unwrap();
let sapling_anchors = self.db.cf_handle("sapling_anchors").unwrap();
let orchard_anchors = self.db.cf_handle("orchard_anchors").unwrap();
for transaction in block.transactions.iter() {
// Sprout
for joinsplit in transaction.sprout_groth16_joinsplits() {
batch.zs_insert(
sprout_anchors,
joinsplit.anchor,
sprout::tree::NoteCommitmentTree::default(),
);
}
// Sapling
for anchor in transaction.sapling_anchors() {
batch.zs_insert(sapling_anchors, anchor, ());
}
// Orchard
if let Some(orchard_shielded_data) = transaction.orchard_shielded_data() {
batch.zs_insert(orchard_anchors, orchard_shielded_data.shared_anchor, ());
}
}
self.db.write(batch).unwrap();
}
/// Shut down the database, cleaning up background tasks and ephemeral data.
fn shutdown(&mut self) {
// Drop isn't guaranteed to run, such as when we panic, or if the tokio shutdown times out.
//
// Zebra's data should be fine if we don't clean up, because:
// - the database flushes regularly anyway
// - Zebra commits each block in a database transaction, any incomplete blocks get rolled back
// - ephemeral files are placed in the os temp dir and should be cleaned up automatically eventually
tracing::info!("flushing database to disk");
self.db.flush().expect("flush is successful");
// But we should call `cancel_all_background_work` before Zebra exits.
// If we don't, we see these kinds of errors:
// ```
// pthread lock: Invalid argument
// pure virtual method called
// terminate called without an active exception
// pthread destroy mutex: Device or resource busy
// Aborted (core dumped)
// ```
//
// The RocksDB wiki says:
// > Q: Is it safe to close RocksDB while another thread is issuing read, write or manual compaction requests?
// >
// > A: No. The users of RocksDB need to make sure all functions have finished before they close RocksDB.
// > You can speed up the waiting by calling CancelAllBackgroundWork().
//
// https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ
tracing::info!("stopping background database tasks");
self.db.cancel_all_background_work(true);
// We'd like to drop the database before deleting its files,
// because that closes the column families and the database correctly.
// But Rust's ownership rules make that difficult,
// so we just flush and delete ephemeral data instead.
//
// The RocksDB wiki says:
// > rocksdb::DB instances need to be destroyed before your main function exits.
// > RocksDB instances usually depend on some internal static variables.
// > Users need to make sure rocksdb::DB instances are destroyed before those static variables.
//
// https://github.com/facebook/rocksdb/wiki/Known-Issues
//
// But our current code doesn't seem to cause any issues.
// We might want to explicitly drop the database as part of graceful shutdown (#1678).
self.delete_ephemeral();
}
}
impl Drop for FinalizedState {
fn drop(&mut self) {
self.shutdown();
}
}
fn block_precommit_metrics(block: &Block, hash: block::Hash, height: block::Height) {

View File

@ -6,9 +6,18 @@ use std::sync::Arc;
use proptest::prelude::*;
use zebra_chain::block;
use zebra_chain::{
amount::NonNegative,
block::{self, Block},
sprout,
value_balance::ValueBalance,
};
use crate::service::finalized_state::disk_format::{FromDisk, IntoDisk, TransactionLocation};
use crate::service::finalized_state::{
disk_db::{DiskWriteBatch, WriteDisk},
disk_format::{FromDisk, IntoDisk, TransactionLocation},
FinalizedState,
};
impl Arbitrary for TransactionLocation {
type Parameters = ();
@ -84,3 +93,47 @@ where
assert_round_trip_arc(Arc::new(input.clone()));
assert_round_trip(input);
}
impl FinalizedState {
/// Allow to set up a fake value pool in the database for testing purposes.
pub fn set_current_value_pool(&self, fake_value_pool: ValueBalance<NonNegative>) {
let mut batch = DiskWriteBatch::new();
let value_pool_cf = self.db.cf_handle("tip_chain_value_pool").unwrap();
batch.zs_insert(value_pool_cf, (), fake_value_pool);
self.db.write(batch).unwrap();
}
/// Artificially prime the note commitment tree anchor sets with anchors
/// referenced in a block, for testing purposes _only_.
pub fn populate_with_anchors(&self, block: &Block) {
let mut batch = DiskWriteBatch::new();
let sprout_anchors = self.db.cf_handle("sprout_anchors").unwrap();
let sapling_anchors = self.db.cf_handle("sapling_anchors").unwrap();
let orchard_anchors = self.db.cf_handle("orchard_anchors").unwrap();
for transaction in block.transactions.iter() {
// Sprout
for joinsplit in transaction.sprout_groth16_joinsplits() {
batch.zs_insert(
sprout_anchors,
joinsplit.anchor,
sprout::tree::NoteCommitmentTree::default(),
);
}
// Sapling
for anchor in transaction.sapling_anchors() {
batch.zs_insert(sapling_anchors, anchor, ());
}
// Orchard
if let Some(orchard_shielded_data) = transaction.orchard_shielded_data() {
batch.zs_insert(orchard_anchors, orchard_shielded_data.shared_anchor, ());
}
}
self.db.write(batch).unwrap();
}
}

View File

@ -3,10 +3,39 @@
//! This module makes sure that:
//! - all disk writes happen inside a RocksDB transaction, and
//! - format-specific invariants are maintained.
//!
//! # Correctness
//!
//! The [`crate::constants::DATABASE_FORMAT_VERSION`] constant must
//! be incremented each time the database format (column, serialization, etc) changes.
use std::fmt::Debug;
use std::{fmt::Debug, path::Path};
use crate::service::finalized_state::disk_format::{FromDisk, IntoDisk};
use rlimit::increase_nofile_limit;
use zebra_chain::parameters::Network;
use crate::{
service::finalized_state::disk_format::{FromDisk, IntoDisk},
Config,
};
/// Wrapper struct to ensure low-level database access goes through the correct API.
pub struct DiskDb {
/// The inner RocksDB database.
db: rocksdb::DB,
/// The configured temporary database setting.
///
/// If true, the database files are deleted on drop.
ephemeral: bool,
}
/// Wrapper struct to ensure low-level database writes go through the correct API.
pub struct DiskWriteBatch {
/// The inner RocksDB write batch.
batch: rocksdb::WriteBatch,
}
/// Helper trait for inserting (Key, Value) pairs into rocksdb with a consistently
/// defined format
@ -24,7 +53,7 @@ pub trait WriteDisk {
K: IntoDisk + Debug;
}
impl WriteDisk for rocksdb::WriteBatch {
impl WriteDisk for DiskWriteBatch {
fn zs_insert<K, V>(&mut self, cf: &rocksdb::ColumnFamily, key: K, value: V)
where
K: IntoDisk + Debug,
@ -32,7 +61,7 @@ impl WriteDisk for rocksdb::WriteBatch {
{
let key_bytes = key.as_bytes();
let value_bytes = value.as_bytes();
self.put_cf(cf, key_bytes, value_bytes);
self.batch.put_cf(cf, key_bytes, value_bytes);
}
fn zs_delete<K>(&mut self, cf: &rocksdb::ColumnFamily, key: K)
@ -40,7 +69,7 @@ impl WriteDisk for rocksdb::WriteBatch {
K: IntoDisk + Debug,
{
let key_bytes = key.as_bytes();
self.delete_cf(cf, key_bytes);
self.batch.delete_cf(cf, key_bytes);
}
}
@ -59,7 +88,7 @@ pub trait ReadDisk {
K: IntoDisk;
}
impl ReadDisk for rocksdb::DB {
impl ReadDisk for DiskDb {
fn zs_get<K, V>(&self, cf: &rocksdb::ColumnFamily, key: &K) -> Option<V>
where
K: IntoDisk,
@ -71,6 +100,7 @@ impl ReadDisk for rocksdb::DB {
// value, because we're going to deserialize it anyways, which avoids an
// extra copy
let value_bytes = self
.db
.get_pinned_cf(cf, key_bytes)
.expect("expected that disk errors would not occur");
@ -85,8 +115,312 @@ impl ReadDisk for rocksdb::DB {
// We use `get_pinned_cf` to avoid taking ownership of the serialized
// value, because we don't use the value at all. This avoids an extra copy.
self.get_pinned_cf(cf, key_bytes)
self.db
.get_pinned_cf(cf, key_bytes)
.expect("expected that disk errors would not occur")
.is_some()
}
}
impl DiskWriteBatch {
pub fn new() -> Self {
DiskWriteBatch {
batch: rocksdb::WriteBatch::default(),
}
}
}
impl DiskDb {
/// The ideal open file limit for Zebra
const IDEAL_OPEN_FILE_LIMIT: u64 = 1024;
/// The minimum number of open files for Zebra to operate normally. Also used
/// as the default open file limit, when the OS doesn't tell us how many
/// files we can use.
///
/// We want 100+ file descriptors for peers, and 100+ for the database.
///
/// On Windows, the default limit is 512 high-level I/O files, and 8192
/// low-level I/O files:
/// https://docs.microsoft.com/en-us/cpp/c-runtime-library/reference/setmaxstdio?view=msvc-160#remarks
const MIN_OPEN_FILE_LIMIT: u64 = 512;
/// The number of files used internally by Zebra.
///
/// Zebra uses file descriptors for OS libraries (10+), polling APIs (10+),
/// stdio (3), and other OS facilities (2+).
const RESERVED_FILE_COUNT: u64 = 48;
pub fn new(config: &Config, network: Network) -> DiskDb {
let path = config.db_path(network);
let db_options = DiskDb::options();
let column_families = vec![
rocksdb::ColumnFamilyDescriptor::new("hash_by_height", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("height_by_hash", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("block_by_height", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("tx_by_hash", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("utxo_by_outpoint", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("sprout_nullifiers", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("sapling_nullifiers", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("orchard_nullifiers", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("sprout_anchors", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("sapling_anchors", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("orchard_anchors", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("sprout_note_commitment_tree", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new(
"sapling_note_commitment_tree",
db_options.clone(),
),
rocksdb::ColumnFamilyDescriptor::new(
"orchard_note_commitment_tree",
db_options.clone(),
),
rocksdb::ColumnFamilyDescriptor::new("history_tree", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("tip_chain_value_pool", db_options.clone()),
];
// TODO: move opening the database to a blocking thread (#2188)
let db_result = rocksdb::DB::open_cf_descriptors(&db_options, &path, column_families);
match db_result {
Ok(db) => {
info!("Opened Zebra state cache at {}", path.display());
DiskDb {
db,
ephemeral: config.ephemeral,
}
}
// TODO: provide a different hint if the disk is full, see #1623
Err(e) => panic!(
"Opening database {:?} failed: {:?}. \
Hint: Check if another zebrad process is running. \
Try changing the state cache_dir in the Zebra config.",
path, e,
),
}
}
/// Returns the `Path` where the files used by this database are located.
pub fn path(&self) -> &Path {
self.db.path()
}
/// Returns the column family handle for `cf_name`.
pub fn cf_handle(&self, cf_name: &str) -> Option<&rocksdb::ColumnFamily> {
self.db.cf_handle(cf_name)
}
/// Returns an iterator over the keys in `cf_name`, starting from the first key.
pub fn forward_iterator(&self, cf_handle: &rocksdb::ColumnFamily) -> rocksdb::DBIterator {
self.db.iterator_cf(cf_handle, rocksdb::IteratorMode::Start)
}
/// Returns a reverse iterator over the keys in `cf_name`, starting from the last key.
pub fn reverse_iterator(&self, cf_handle: &rocksdb::ColumnFamily) -> rocksdb::DBIterator {
self.db.iterator_cf(cf_handle, rocksdb::IteratorMode::End)
}
/// Returns true if `cf` does not contain any entries.
pub fn is_empty(&self, cf_handle: &rocksdb::ColumnFamily) -> bool {
// Empty column families return invalid iterators.
!self.forward_iterator(cf_handle).valid()
}
/// Writes `batch` to the database.
pub fn write(&self, batch: DiskWriteBatch) -> Result<(), rocksdb::Error> {
// TODO: move writing to the database to a blocking thread (#2188)
self.db.write(batch.batch)
}
/// Returns the database options for the finalized state database.
fn options() -> rocksdb::Options {
let mut opts = rocksdb::Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
let open_file_limit = DiskDb::increase_open_file_limit();
let db_file_limit = DiskDb::get_db_open_file_limit(open_file_limit);
// If the current limit is very large, set the DB limit using the ideal limit
let ideal_limit = DiskDb::get_db_open_file_limit(DiskDb::IDEAL_OPEN_FILE_LIMIT)
.try_into()
.expect("ideal open file limit fits in a c_int");
let db_file_limit = db_file_limit.try_into().unwrap_or(ideal_limit);
opts.set_max_open_files(db_file_limit);
opts
}
/// Calculate the database's share of `open_file_limit`
fn get_db_open_file_limit(open_file_limit: u64) -> u64 {
// Give the DB half the files, and reserve half the files for peers
(open_file_limit - DiskDb::RESERVED_FILE_COUNT) / 2
}
/// Increase the open file limit for this process to `IDEAL_OPEN_FILE_LIMIT`.
/// If that fails, try `MIN_OPEN_FILE_LIMIT`.
///
/// If the current limit is above `IDEAL_OPEN_FILE_LIMIT`, leaves it
/// unchanged.
///
/// Returns the current limit, after any successful increases.
///
/// # Panics
///
/// If the open file limit can not be increased to `MIN_OPEN_FILE_LIMIT`.
fn increase_open_file_limit() -> u64 {
// `increase_nofile_limit` doesn't do anything on Windows in rlimit 0.7.0.
//
// On Windows, the default limit is:
// - 512 high-level stream I/O files (via the C standard functions), and
// - 8192 low-level I/O files (via the Unix C functions).
// https://docs.microsoft.com/en-us/cpp/c-runtime-library/reference/setmaxstdio?view=msvc-160#remarks
//
// If we need more high-level I/O files on Windows,
// use `setmaxstdio` and `getmaxstdio` from the `rlimit` crate:
// https://docs.rs/rlimit/latest/rlimit/#windows
//
// Then panic if `setmaxstdio` fails to set the minimum value,
// and `getmaxstdio` is below the minimum value.
// We try setting the ideal limit, then the minimum limit.
let current_limit = match increase_nofile_limit(DiskDb::IDEAL_OPEN_FILE_LIMIT) {
Ok(current_limit) => current_limit,
Err(limit_error) => {
info!(
?limit_error,
min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
"unable to increase the open file limit, \
assuming Zebra can open a minimum number of files"
);
return DiskDb::MIN_OPEN_FILE_LIMIT;
}
};
if current_limit < DiskDb::MIN_OPEN_FILE_LIMIT {
panic!(
"open file limit too low: \
unable to set the number of open files to {}, \
the minimum number of files required by Zebra. \
Current limit is {:?}. \
Hint: Increase the open file limit to {} before launching Zebra",
DiskDb::MIN_OPEN_FILE_LIMIT,
current_limit,
DiskDb::IDEAL_OPEN_FILE_LIMIT
);
} else if current_limit < DiskDb::IDEAL_OPEN_FILE_LIMIT {
warn!(
?current_limit,
min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
"the maximum number of open files is below Zebra's ideal limit. \
Hint: Increase the open file limit to {} before launching Zebra",
DiskDb::IDEAL_OPEN_FILE_LIMIT
);
} else if cfg!(windows) {
info!(
min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
"assuming the open file limit is high enough for Zebra",
);
} else {
info!(
?current_limit,
min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
"the open file limit is high enough for Zebra",
);
}
current_limit
}
/// Shut down the database, cleaning up background tasks and ephemeral data.
///
/// TODO: make private after the stop height check has moved to the syncer (#3442)
/// move shutting down the database to a blocking thread (#2188)
pub(crate) fn shutdown(&mut self) {
// Drop isn't guaranteed to run, such as when we panic, or if the tokio shutdown times out.
//
// Zebra's data should be fine if we don't clean up, because:
// - the database flushes regularly anyway
// - Zebra commits each block in a database transaction, any incomplete blocks get rolled back
// - ephemeral files are placed in the os temp dir and should be cleaned up automatically eventually
info!("flushing database to disk");
self.db.flush().expect("flush is successful");
// But we should call `cancel_all_background_work` before Zebra exits.
// If we don't, we see these kinds of errors:
// ```
// pthread lock: Invalid argument
// pure virtual method called
// terminate called without an active exception
// pthread destroy mutex: Device or resource busy
// Aborted (core dumped)
// ```
//
// The RocksDB wiki says:
// > Q: Is it safe to close RocksDB while another thread is issuing read, write or manual compaction requests?
// >
// > A: No. The users of RocksDB need to make sure all functions have finished before they close RocksDB.
// > You can speed up the waiting by calling CancelAllBackgroundWork().
//
// https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ
info!("stopping background database tasks");
self.db.cancel_all_background_work(true);
// We'd like to drop the database before deleting its files,
// because that closes the column families and the database correctly.
// But Rust's ownership rules make that difficult,
// so we just flush and delete ephemeral data instead.
//
// The RocksDB wiki says:
// > rocksdb::DB instances need to be destroyed before your main function exits.
// > RocksDB instances usually depend on some internal static variables.
// > Users need to make sure rocksdb::DB instances are destroyed before those static variables.
//
// https://github.com/facebook/rocksdb/wiki/Known-Issues
//
// But our current code doesn't seem to cause any issues.
// We might want to explicitly drop the database as part of graceful shutdown (#1678).
self.delete_ephemeral();
}
/// If the database is `ephemeral`, delete it.
fn delete_ephemeral(&self) {
if self.ephemeral {
let path = self.path();
info!(cache_path = ?path, "removing temporary database files");
// We'd like to use `rocksdb::Env::mem_env` for ephemeral databases,
// but the Zcash blockchain might not fit in memory. So we just
// delete the database files instead.
//
// We'd like to call `DB::destroy` here, but calling destroy on a
// live DB is undefined behaviour:
// https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ#basic-readwrite
//
// So we assume that all the database files are under `path`, and
// delete them using standard filesystem APIs. Deleting open files
// might cause errors on non-Unix platforms, so we ignore the result.
// (The OS will delete them eventually anyway.)
let res = std::fs::remove_dir_all(path);
// TODO: downgrade to debug once bugs like #2905 are fixed
// but leave any errors at "info" level
info!(?res, "removed temporary database files");
}
}
}
impl Drop for DiskDb {
fn drop(&mut self) {
self.shutdown();
}
}

View File

@ -1,4 +1,9 @@
//! Module defining the serialization format for finalized data.
//!
//! # Correctness
//!
//! The [`crate::constants::DATABASE_FORMAT_VERSION`] constant must
//! be incremented each time the database format (column, serialization, etc) changes.
use std::{collections::BTreeMap, convert::TryInto, fmt::Debug, sync::Arc};