6. refactor(state): prepare finalized state for shared read-only access (#3810)
* Move the legacy chain check to the `check` module And move `populated_state` to the `arbitrary` module. * Cleanup imports * Document the state service struct * Split state block iter into its own module * Prepare the finalized state for read-only state * Add a forced shutdown mode, used in test code before forced exits * Document the small database drop race condition window
This commit is contained in:
parent
04e339f097
commit
199267bfa3
|
@ -28,7 +28,7 @@ use zebra_chain::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
service::{check, finalized_state::disk_db::DiskDb, QueuedFinalized},
|
service::{check, QueuedFinalized},
|
||||||
BoxError, Config, FinalizedBlock,
|
BoxError, Config, FinalizedBlock,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -42,9 +42,14 @@ mod arbitrary;
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests;
|
mod tests;
|
||||||
|
|
||||||
|
// TODO: replace with a ZebraDb struct
|
||||||
|
pub(super) use disk_db::DiskDb;
|
||||||
|
|
||||||
/// The finalized part of the chain state, stored in the db.
|
/// The finalized part of the chain state, stored in the db.
|
||||||
|
#[derive(Debug)]
|
||||||
pub struct FinalizedState {
|
pub struct FinalizedState {
|
||||||
/// The underlying database.
|
/// The underlying database.
|
||||||
|
// TODO: replace with a ZebraDb struct
|
||||||
db: DiskDb,
|
db: DiskDb,
|
||||||
|
|
||||||
/// Queued blocks that arrived out of order, indexed by their parent block hash.
|
/// Queued blocks that arrived out of order, indexed by their parent block hash.
|
||||||
|
@ -301,7 +306,8 @@ impl FinalizedState {
|
||||||
"stopping at configured height, flushing database to disk"
|
"stopping at configured height, flushing database to disk"
|
||||||
);
|
);
|
||||||
|
|
||||||
self.db.shutdown();
|
// We're just about to do a forced exit, so it's ok to do a forced db shutdown
|
||||||
|
self.db.shutdown(true);
|
||||||
|
|
||||||
Self::exit_process();
|
Self::exit_process();
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,7 +9,7 @@
|
||||||
//! The [`crate::constants::DATABASE_FORMAT_VERSION`] constant must
|
//! The [`crate::constants::DATABASE_FORMAT_VERSION`] constant must
|
||||||
//! be incremented each time the database format (column, serialization, etc) changes.
|
//! be incremented each time the database format (column, serialization, etc) changes.
|
||||||
|
|
||||||
use std::{fmt::Debug, path::Path};
|
use std::{fmt::Debug, path::Path, sync::Arc};
|
||||||
|
|
||||||
use rlimit::increase_nofile_limit;
|
use rlimit::increase_nofile_limit;
|
||||||
|
|
||||||
|
@ -24,9 +24,13 @@ use crate::{
|
||||||
mod tests;
|
mod tests;
|
||||||
|
|
||||||
/// Wrapper struct to ensure low-level database access goes through the correct API.
|
/// Wrapper struct to ensure low-level database access goes through the correct API.
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
pub struct DiskDb {
|
pub struct DiskDb {
|
||||||
/// The inner RocksDB database.
|
/// The shared inner RocksDB database.
|
||||||
db: rocksdb::DB,
|
///
|
||||||
|
/// RocksDB allows reads and writes via a shared reference.
|
||||||
|
/// Only column family changes and [`Drop`] require exclusive access.
|
||||||
|
db: Arc<rocksdb::DB>,
|
||||||
|
|
||||||
/// The configured temporary database setting.
|
/// The configured temporary database setting.
|
||||||
///
|
///
|
||||||
|
@ -95,6 +99,22 @@ pub trait ReadDisk {
|
||||||
K: IntoDisk;
|
K: IntoDisk;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl PartialEq for DiskDb {
|
||||||
|
fn eq(&self, other: &Self) -> bool {
|
||||||
|
if self.db.path() == other.db.path() {
|
||||||
|
assert_eq!(
|
||||||
|
self.ephemeral, other.ephemeral,
|
||||||
|
"database with same path but different ephemeral configs",
|
||||||
|
);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Eq for DiskDb {}
|
||||||
|
|
||||||
impl ReadDisk for DiskDb {
|
impl ReadDisk for DiskDb {
|
||||||
fn zs_get<K, V>(&self, cf: &rocksdb::ColumnFamily, key: &K) -> Option<V>
|
fn zs_get<K, V>(&self, cf: &rocksdb::ColumnFamily, key: &K) -> Option<V>
|
||||||
where
|
where
|
||||||
|
@ -199,7 +219,7 @@ impl DiskDb {
|
||||||
info!("Opened Zebra state cache at {}", path.display());
|
info!("Opened Zebra state cache at {}", path.display());
|
||||||
|
|
||||||
let db = DiskDb {
|
let db = DiskDb {
|
||||||
db,
|
db: Arc::new(db),
|
||||||
ephemeral: config.ephemeral,
|
ephemeral: config.ephemeral,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -382,9 +402,30 @@ impl DiskDb {
|
||||||
|
|
||||||
/// Shut down the database, cleaning up background tasks and ephemeral data.
|
/// Shut down the database, cleaning up background tasks and ephemeral data.
|
||||||
///
|
///
|
||||||
|
/// If `force` is true, clean up regardless of any shared references.
|
||||||
|
/// `force` can cause errors accessing the database from other shared references.
|
||||||
|
/// It should only be used in debugging or test code, immediately before a manual shutdown.
|
||||||
|
///
|
||||||
/// TODO: make private after the stop height check has moved to the syncer (#3442)
|
/// TODO: make private after the stop height check has moved to the syncer (#3442)
|
||||||
/// move shutting down the database to a blocking thread (#2188)
|
/// move shutting down the database to a blocking thread (#2188),
|
||||||
pub(crate) fn shutdown(&mut self) {
|
/// and remove `force` and the manual flush
|
||||||
|
pub(crate) fn shutdown(&mut self, force: bool) {
|
||||||
|
// Prevent a race condition where another thread clones the Arc,
|
||||||
|
// right after we've checked we're the only holder of the Arc.
|
||||||
|
//
|
||||||
|
// There is still a small race window after the guard is dropped,
|
||||||
|
// but if the race happens, it will only cause database errors during shutdown.
|
||||||
|
let clone_prevention_guard = Arc::get_mut(&mut self.db);
|
||||||
|
|
||||||
|
if clone_prevention_guard.is_none() && !force {
|
||||||
|
debug!(
|
||||||
|
"dropping cloned DiskDb, \
|
||||||
|
but keeping shared database until the last reference is dropped",
|
||||||
|
);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
self.assert_default_cf_is_empty();
|
self.assert_default_cf_is_empty();
|
||||||
|
|
||||||
// Drop isn't guaranteed to run, such as when we panic, or if the tokio shutdown times out.
|
// Drop isn't guaranteed to run, such as when we panic, or if the tokio shutdown times out.
|
||||||
|
@ -430,12 +471,35 @@ impl DiskDb {
|
||||||
//
|
//
|
||||||
// But our current code doesn't seem to cause any issues.
|
// But our current code doesn't seem to cause any issues.
|
||||||
// We might want to explicitly drop the database as part of graceful shutdown (#1678).
|
// We might want to explicitly drop the database as part of graceful shutdown (#1678).
|
||||||
self.delete_ephemeral();
|
self.delete_ephemeral(force);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// If the database is `ephemeral`, delete it.
|
/// If the database is `ephemeral`, delete it.
|
||||||
fn delete_ephemeral(&self) {
|
///
|
||||||
if self.ephemeral {
|
/// If `force` is true, clean up regardless of any shared references.
|
||||||
|
/// `force` can cause errors accessing the database from other shared references.
|
||||||
|
/// It should only be used in debugging or test code, immediately before a manual shutdown.
|
||||||
|
fn delete_ephemeral(&mut self, force: bool) {
|
||||||
|
if !self.ephemeral {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prevent a race condition where another thread clones the Arc,
|
||||||
|
// right after we've checked we're the only holder of the Arc.
|
||||||
|
//
|
||||||
|
// There is still a small race window after the guard is dropped,
|
||||||
|
// but if the race happens, it will only cause database errors during shutdown.
|
||||||
|
let clone_prevention_guard = Arc::get_mut(&mut self.db);
|
||||||
|
|
||||||
|
if clone_prevention_guard.is_none() && !force {
|
||||||
|
debug!(
|
||||||
|
"dropping cloned DiskDb, \
|
||||||
|
but keeping shared database files until the last reference is dropped",
|
||||||
|
);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
let path = self.path();
|
let path = self.path();
|
||||||
info!(cache_path = ?path, "removing temporary database files");
|
info!(cache_path = ?path, "removing temporary database files");
|
||||||
|
|
||||||
|
@ -457,7 +521,6 @@ impl DiskDb {
|
||||||
// but leave any errors at "info" level
|
// but leave any errors at "info" level
|
||||||
info!(?res, "removed temporary database files");
|
info!(?res, "removed temporary database files");
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/// Check that the "default" column family is empty.
|
/// Check that the "default" column family is empty.
|
||||||
///
|
///
|
||||||
|
@ -476,6 +539,6 @@ impl DiskDb {
|
||||||
|
|
||||||
impl Drop for DiskDb {
|
impl Drop for DiskDb {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
self.shutdown();
|
self.shutdown(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue