feat(state): delete old database directories (#4586)
* delete old database directories * check if state directory exists * skip deleting when ephemeral * split `check_and_delete_old_databases` * move `check_and_delete_old_databases` to state * spawn `check_and_delete_old_databases` * simplity a bit * fix(state): only delete old database directories inside the cache directory (#4631) * Add function comments, tweak log * Simplify version parsing * Use spawn_blocking to launch the task on a separate thread, do the cleanup last * Abort the cleanup task when Zebra exits * Split directory deletion into its own function, handle ownership * Rename cache_dir to state_dir * If an outdated state directory is outside the cache directory, don't delete it * Minimise diffs * add test * fix typos Co-authored-by: teor <teor@riseup.net> * add `canonicalize` to test regex * add another match to test Co-authored-by: teor <teor@riseup.net> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
This commit is contained in:
parent
961fcb621e
commit
769d069d0a
|
@ -1,6 +1,13 @@
|
||||||
use std::path::PathBuf;
|
//! Cached state configuration for Zebra.
|
||||||
|
|
||||||
|
use std::{
|
||||||
|
fs::{canonicalize, remove_dir_all, DirEntry, ReadDir},
|
||||||
|
path::{Path, PathBuf},
|
||||||
|
};
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use tokio::task::{spawn_blocking, JoinHandle};
|
||||||
|
use tracing::Span;
|
||||||
|
|
||||||
use zebra_chain::parameters::Network;
|
use zebra_chain::parameters::Network;
|
||||||
|
|
||||||
|
@ -57,6 +64,13 @@ pub struct Config {
|
||||||
///
|
///
|
||||||
/// Set to `None` by default: Zebra continues syncing indefinitely.
|
/// Set to `None` by default: Zebra continues syncing indefinitely.
|
||||||
pub debug_stop_at_height: Option<u32>,
|
pub debug_stop_at_height: Option<u32>,
|
||||||
|
|
||||||
|
/// Whether to delete the old database directories when present.
|
||||||
|
///
|
||||||
|
/// Set to `true` by default. If this is set to `false`,
|
||||||
|
/// no check for old database versions will be made and nothing will be
|
||||||
|
/// deleted.
|
||||||
|
pub delete_old_database: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn gen_temp_path(prefix: &str) -> PathBuf {
|
fn gen_temp_path(prefix: &str) -> PathBuf {
|
||||||
|
@ -108,6 +122,125 @@ impl Default for Config {
|
||||||
cache_dir,
|
cache_dir,
|
||||||
ephemeral: false,
|
ephemeral: false,
|
||||||
debug_stop_at_height: None,
|
debug_stop_at_height: None,
|
||||||
|
delete_old_database: true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Cleaning up old database versions
|
||||||
|
|
||||||
|
/// Spawns a task that checks if there are old database folders,
|
||||||
|
/// and deletes them from the filesystem.
|
||||||
|
///
|
||||||
|
/// Iterate over the files and directories in the databases folder and delete if:
|
||||||
|
/// - The state directory exists.
|
||||||
|
/// - The entry is a directory.
|
||||||
|
/// - The directory name has a prefix `v`.
|
||||||
|
/// - The directory name without the prefix can be parsed as an unsigned number.
|
||||||
|
/// - The parsed number is lower than the hardcoded `DATABASE_FORMAT_VERSION`.
|
||||||
|
pub fn check_and_delete_old_databases(config: Config) -> JoinHandle<()> {
|
||||||
|
let current_span = Span::current();
|
||||||
|
|
||||||
|
spawn_blocking(move || {
|
||||||
|
current_span.in_scope(|| {
|
||||||
|
delete_old_databases(config);
|
||||||
|
info!("finished old database version cleanup task");
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if there are old database folders and delete them from the filesystem.
|
||||||
|
///
|
||||||
|
/// See [`check_and_delete_old_databases`] for details.
|
||||||
|
fn delete_old_databases(config: Config) {
|
||||||
|
if config.ephemeral || !config.delete_old_database {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("checking for old database versions");
|
||||||
|
|
||||||
|
let state_dir = config.cache_dir.join("state");
|
||||||
|
if let Some(state_dir) = read_dir(&state_dir) {
|
||||||
|
for entry in state_dir.flatten() {
|
||||||
|
let deleted_state = check_and_delete_database(&config, &entry);
|
||||||
|
|
||||||
|
if let Some(deleted_state) = deleted_state {
|
||||||
|
info!(?deleted_state, "deleted outdated state directory");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return a `ReadDir` for `dir`, after checking that `dir` exists and can be read.
|
||||||
|
///
|
||||||
|
/// Returns `None` if any operation fails.
|
||||||
|
fn read_dir(dir: &Path) -> Option<ReadDir> {
|
||||||
|
if dir.exists() {
|
||||||
|
if let Ok(read_dir) = dir.read_dir() {
|
||||||
|
return Some(read_dir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if `entry` is an old database directory, and delete it from the filesystem.
|
||||||
|
/// See [`check_and_delete_old_databases`] for details.
|
||||||
|
///
|
||||||
|
/// If the directory was deleted, returns its path.
|
||||||
|
fn check_and_delete_database(config: &Config, entry: &DirEntry) -> Option<PathBuf> {
|
||||||
|
let dir_name = parse_dir_name(entry)?;
|
||||||
|
let version_number = parse_version_number(&dir_name)?;
|
||||||
|
|
||||||
|
if version_number >= crate::constants::DATABASE_FORMAT_VERSION {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
let outdated_path = entry.path();
|
||||||
|
|
||||||
|
// # Correctness
|
||||||
|
//
|
||||||
|
// Check that the path we're about to delete is inside the cache directory.
|
||||||
|
// If the user has symlinked the outdated state directory to a non-cache directory,
|
||||||
|
// we don't want to delete it, because it might contain other files.
|
||||||
|
//
|
||||||
|
// We don't attempt to guard against malicious symlinks created by attackers
|
||||||
|
// (TOCTOU attacks). Zebra should not be run with elevated privileges.
|
||||||
|
let cache_path = canonicalize(&config.cache_dir).ok()?;
|
||||||
|
let outdated_path = canonicalize(outdated_path).ok()?;
|
||||||
|
|
||||||
|
if !outdated_path.starts_with(&cache_path) {
|
||||||
|
info!(
|
||||||
|
skipped_path = ?outdated_path,
|
||||||
|
?cache_path,
|
||||||
|
"skipped cleanup of outdated state directory: state is outside cache directory",
|
||||||
|
);
|
||||||
|
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
remove_dir_all(&outdated_path).ok().map(|()| outdated_path)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if `entry` is a directory with a valid UTF-8 name.
|
||||||
|
/// (State directory names are guaranteed to be UTF-8.)
|
||||||
|
///
|
||||||
|
/// Returns `None` if any operation fails.
|
||||||
|
fn parse_dir_name(entry: &DirEntry) -> Option<String> {
|
||||||
|
if let Ok(file_type) = entry.file_type() {
|
||||||
|
if file_type.is_dir() {
|
||||||
|
if let Ok(dir_name) = entry.file_name().into_string() {
|
||||||
|
return Some(dir_name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parse the state version number from `dir_name`.
|
||||||
|
///
|
||||||
|
/// Returns `None` if parsing fails, or the directory name is not in the expected format.
|
||||||
|
fn parse_version_number(dir_name: &str) -> Option<u32> {
|
||||||
|
dir_name
|
||||||
|
.strip_prefix('v')
|
||||||
|
.and_then(|version| version.parse().ok())
|
||||||
|
}
|
||||||
|
|
|
@ -28,7 +28,7 @@ mod util;
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests;
|
mod tests;
|
||||||
|
|
||||||
pub use config::Config;
|
pub use config::{check_and_delete_old_databases, Config};
|
||||||
pub use constants::MAX_BLOCK_REORG_HEIGHT;
|
pub use constants::MAX_BLOCK_REORG_HEIGHT;
|
||||||
pub use error::{BoxError, CloneError, CommitBlockError, ValidateContextError};
|
pub use error::{BoxError, CloneError, CommitBlockError, ValidateContextError};
|
||||||
pub use request::{FinalizedBlock, HashOrHeight, PreparedBlock, ReadRequest, Request};
|
pub use request::{FinalizedBlock, HashOrHeight, PreparedBlock, ReadRequest, Request};
|
||||||
|
|
|
@ -37,6 +37,8 @@
|
||||||
//! * contextually verifies blocks
|
//! * contextually verifies blocks
|
||||||
//! * handles in-memory storage of multiple non-finalized chains
|
//! * handles in-memory storage of multiple non-finalized chains
|
||||||
//! * handles permanent storage of the best finalized chain
|
//! * handles permanent storage of the best finalized chain
|
||||||
|
//! * Old State Version Cleanup Task
|
||||||
|
//! * deletes outdated state versions
|
||||||
//! * Block Gossip Task
|
//! * Block Gossip Task
|
||||||
//! * runs in the background and continuously queries the state for
|
//! * runs in the background and continuously queries the state for
|
||||||
//! newly committed blocks to be gossiped to peers
|
//! newly committed blocks to be gossiped to peers
|
||||||
|
@ -218,6 +220,9 @@ impl StartCmd {
|
||||||
.in_current_span(),
|
.in_current_span(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let mut old_databases_task_handle =
|
||||||
|
zebra_state::check_and_delete_old_databases(config.state.clone());
|
||||||
|
|
||||||
info!("spawned initial Zebra tasks");
|
info!("spawned initial Zebra tasks");
|
||||||
|
|
||||||
// TODO: put tasks into an ongoing FuturesUnordered and a startup FuturesUnordered?
|
// TODO: put tasks into an ongoing FuturesUnordered and a startup FuturesUnordered?
|
||||||
|
@ -235,6 +240,8 @@ impl StartCmd {
|
||||||
// startup tasks
|
// startup tasks
|
||||||
let groth16_download_handle_fused = (&mut groth16_download_handle).fuse();
|
let groth16_download_handle_fused = (&mut groth16_download_handle).fuse();
|
||||||
pin!(groth16_download_handle_fused);
|
pin!(groth16_download_handle_fused);
|
||||||
|
let old_databases_task_handle_fused = (&mut old_databases_task_handle).fuse();
|
||||||
|
pin!(old_databases_task_handle_fused);
|
||||||
|
|
||||||
// Wait for tasks to finish
|
// Wait for tasks to finish
|
||||||
let exit_status = loop {
|
let exit_status = loop {
|
||||||
|
@ -297,6 +304,16 @@ impl StartCmd {
|
||||||
exit_when_task_finishes = false;
|
exit_when_task_finishes = false;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The same for the old databases task, we expect it to finish while Zebra is running.
|
||||||
|
old_databases_result = &mut old_databases_task_handle_fused => {
|
||||||
|
old_databases_result
|
||||||
|
.unwrap_or_else(|_| panic!(
|
||||||
|
"unexpected panic deleting old database directories"));
|
||||||
|
|
||||||
|
exit_when_task_finishes = false;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Stop Zebra if a task finished and returned an error,
|
// Stop Zebra if a task finished and returned an error,
|
||||||
|
@ -324,6 +341,7 @@ impl StartCmd {
|
||||||
|
|
||||||
// startup tasks
|
// startup tasks
|
||||||
groth16_download_handle.abort();
|
groth16_download_handle.abort();
|
||||||
|
old_databases_task_handle.abort();
|
||||||
|
|
||||||
exit_status
|
exit_status
|
||||||
}
|
}
|
||||||
|
|
|
@ -1642,6 +1642,70 @@ async fn fully_synced_rpc_test() -> Result<()> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn delete_old_databases() -> Result<()> {
|
||||||
|
use std::fs::{canonicalize, create_dir};
|
||||||
|
|
||||||
|
zebra_test::init();
|
||||||
|
|
||||||
|
let mut config = default_test_config()?;
|
||||||
|
let run_dir = testdir()?;
|
||||||
|
let cache_dir = run_dir.path().join("state");
|
||||||
|
|
||||||
|
// create cache dir
|
||||||
|
create_dir(cache_dir.clone())?;
|
||||||
|
|
||||||
|
// create a v1 dir outside cache dir that should not be deleted
|
||||||
|
let outside_dir = run_dir.path().join("v1");
|
||||||
|
create_dir(&outside_dir)?;
|
||||||
|
assert!(outside_dir.as_path().exists());
|
||||||
|
|
||||||
|
// create a `v1` dir inside cache dir that should be deleted
|
||||||
|
let inside_dir = cache_dir.join("v1");
|
||||||
|
create_dir(&inside_dir)?;
|
||||||
|
let canonicalized_inside_dir = canonicalize(inside_dir.clone()).ok().unwrap();
|
||||||
|
assert!(inside_dir.as_path().exists());
|
||||||
|
|
||||||
|
// modify config with our cache dir and not ephemeral configuration
|
||||||
|
// (delete old databases function will not run when ephemeral = true)
|
||||||
|
config.state.cache_dir = cache_dir;
|
||||||
|
config.state.ephemeral = false;
|
||||||
|
|
||||||
|
// run zebra with our config
|
||||||
|
let mut child = run_dir
|
||||||
|
.with_config(&mut config)?
|
||||||
|
.spawn_child(args!["start"])?;
|
||||||
|
|
||||||
|
// delete checker running
|
||||||
|
child.expect_stdout_line_matches("checking for old database versions".to_string())?;
|
||||||
|
|
||||||
|
// inside dir was deleted
|
||||||
|
child.expect_stdout_line_matches(format!(
|
||||||
|
"deleted outdated state directory deleted_state={:?}",
|
||||||
|
canonicalized_inside_dir
|
||||||
|
))?;
|
||||||
|
assert!(!inside_dir.as_path().exists());
|
||||||
|
|
||||||
|
// deleting old databases task ended
|
||||||
|
child.expect_stdout_line_matches("finished old database version cleanup task".to_string())?;
|
||||||
|
|
||||||
|
// outside dir was not deleted
|
||||||
|
assert!(outside_dir.as_path().exists());
|
||||||
|
|
||||||
|
// finish
|
||||||
|
child.kill()?;
|
||||||
|
|
||||||
|
let output = child.wait_with_output()?;
|
||||||
|
let output = output.assert_failure()?;
|
||||||
|
|
||||||
|
// [Note on port conflict](#Note on port conflict)
|
||||||
|
output
|
||||||
|
.assert_was_killed()
|
||||||
|
.wrap_err("Possible port conflict. Are there other acceptance tests running?")?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Test sending transactions using a lightwalletd instance connected to a zebrad instance.
|
/// Test sending transactions using a lightwalletd instance connected to a zebrad instance.
|
||||||
///
|
///
|
||||||
/// See [`common::lightwalletd::send_transaction_test`] for more information.
|
/// See [`common::lightwalletd::send_transaction_test`] for more information.
|
||||||
|
|
Loading…
Reference in New Issue