feat(scanner): Restart scanning where left (#8080)
* start scanner where it was left * fix tests * add a `scan_start_where_left` test * refactor a log msg * fix some comments * remove function * fix doc comment * clippy * fix `sapling_keys_and_last_scanned_heights()` * simplify start height * i went too far, revert some changes back * change log info to every 10k blocks * fix build * Update height snapshot code and check last height is consistent * Add strictly before and strictly after database key gets * Move to the previous key using strictly before ops * Assert that keys are only inserted once * Update the index in each loop * Update snapshots * Remove debugging code * start scanning at min available height --------- Co-authored-by: teor <teor@riseup.net>
This commit is contained in:
parent
ac72c2b1de
commit
92758a0d9f
|
@ -59,4 +59,9 @@ impl Config {
|
|||
pub fn db_config(&self) -> &DbConfig {
|
||||
&self.db_config
|
||||
}
|
||||
|
||||
/// Returns the database-specific config as mutable.
|
||||
pub fn db_config_mut(&mut self) -> &mut DbConfig {
|
||||
&mut self.db_config
|
||||
}
|
||||
}
|
||||
|
|
|
@ -54,7 +54,7 @@ const INITIAL_WAIT: Duration = Duration::from_secs(15);
|
|||
const CHECK_INTERVAL: Duration = Duration::from_secs(30);
|
||||
|
||||
/// We log an info log with progress after this many blocks.
|
||||
const INFO_LOG_INTERVAL: u32 = 100_000;
|
||||
const INFO_LOG_INTERVAL: u32 = 10_000;
|
||||
|
||||
/// Start a scan task that reads blocks from `state`, scans them with the configured keys in
|
||||
/// `storage`, and then writes the results to `storage`.
|
||||
|
@ -64,21 +64,21 @@ pub async fn start(
|
|||
storage: Storage,
|
||||
) -> Result<(), Report> {
|
||||
let network = storage.network();
|
||||
let mut height = storage.min_sapling_birthday_height();
|
||||
|
||||
// Read keys from the storage on disk, which can block async execution.
|
||||
let key_storage = storage.clone();
|
||||
let key_birthdays = tokio::task::spawn_blocking(move || key_storage.sapling_keys())
|
||||
let key_heights = tokio::task::spawn_blocking(move || key_storage.sapling_keys_last_heights())
|
||||
.wait_for_panics()
|
||||
.await;
|
||||
let key_birthdays = Arc::new(key_birthdays);
|
||||
let key_heights = Arc::new(key_heights);
|
||||
|
||||
let mut height = get_min_height(&key_heights).unwrap_or(storage.min_sapling_birthday_height());
|
||||
|
||||
// Parse and convert keys once, then use them to scan all blocks.
|
||||
// There is some cryptography here, but it should be fast even with thousands of keys.
|
||||
let parsed_keys: HashMap<
|
||||
SaplingScanningKey,
|
||||
(Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>),
|
||||
> = key_birthdays
|
||||
> = key_heights
|
||||
.keys()
|
||||
.map(|key| {
|
||||
let parsed_keys = sapling_key_to_scan_block_keys(key, network)?;
|
||||
|
@ -96,7 +96,7 @@ pub async fn start(
|
|||
state.clone(),
|
||||
chain_tip_change.clone(),
|
||||
storage.clone(),
|
||||
key_birthdays.clone(),
|
||||
key_heights.clone(),
|
||||
parsed_keys.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
@ -125,7 +125,7 @@ pub async fn scan_height_and_store_results(
|
|||
mut state: State,
|
||||
chain_tip_change: ChainTipChange,
|
||||
storage: Storage,
|
||||
key_birthdays: Arc<HashMap<SaplingScanningKey, Height>>,
|
||||
key_last_scanned_heights: Arc<HashMap<SaplingScanningKey, Height>>,
|
||||
parsed_keys: Arc<
|
||||
HashMap<SaplingScanningKey, (Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>)>,
|
||||
>,
|
||||
|
@ -135,19 +135,7 @@ pub async fn scan_height_and_store_results(
|
|||
// Only log at info level every 100,000 blocks.
|
||||
//
|
||||
// TODO: also log progress every 5 minutes once we reach the tip?
|
||||
let is_info_log =
|
||||
height == storage.min_sapling_birthday_height() || height.0 % INFO_LOG_INTERVAL == 0;
|
||||
|
||||
// TODO: add debug logs?
|
||||
if is_info_log {
|
||||
info!(
|
||||
"Scanning the blockchain: now at block {:?}, current tip {:?}",
|
||||
height,
|
||||
chain_tip_change
|
||||
.latest_chain_tip()
|
||||
.best_tip_height_and_hash(),
|
||||
);
|
||||
}
|
||||
let is_info_log = height.0 % INFO_LOG_INTERVAL == 0;
|
||||
|
||||
// Get a block from the state.
|
||||
// We can't use ServiceExt::oneshot() here, because it causes lifetime errors in init().
|
||||
|
@ -168,24 +156,29 @@ pub async fn scan_height_and_store_results(
|
|||
// Scan it with all the keys.
|
||||
//
|
||||
// TODO: scan each key in parallel (after MVP?)
|
||||
for (key_num, (sapling_key, birthday_height)) in key_birthdays.iter().enumerate() {
|
||||
for (key_num, (sapling_key, last_scanned_height)) in key_last_scanned_heights.iter().enumerate()
|
||||
{
|
||||
// Only scan what was not scanned for each key
|
||||
if height <= *last_scanned_height {
|
||||
continue;
|
||||
}
|
||||
|
||||
// # Security
|
||||
//
|
||||
// We can't log `sapling_key` here because it is a private viewing key. Anyone who reads
|
||||
// the logs could use the key to view those transactions.
|
||||
if is_info_log {
|
||||
info!(
|
||||
"Scanning the blockchain for key {}, started at block {:?}",
|
||||
key_num, birthday_height,
|
||||
"Scanning the blockchain for key {}, started at block {:?}, now at block {:?}, current tip {:?}",
|
||||
key_num, last_scanned_height.next().expect("height is not maximum").as_usize(),
|
||||
height.as_usize(),
|
||||
chain_tip_change.latest_chain_tip().best_tip_height().expect("we should have a tip to scan").as_usize(),
|
||||
);
|
||||
}
|
||||
|
||||
// Get the pre-parsed keys for this configured key.
|
||||
let (dfvks, ivks) = parsed_keys.get(sapling_key).cloned().unwrap_or_default();
|
||||
|
||||
// Scan the block, which blocks async execution until the scan is complete.
|
||||
//
|
||||
// TODO: skip scanning before birthday height (#8022)
|
||||
let sapling_key = sapling_key.clone();
|
||||
let block = block.clone();
|
||||
let mut storage = storage.clone();
|
||||
|
@ -403,3 +396,8 @@ fn scanned_block_to_db_result<Nf>(
|
|||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Get the minimal height available in a key_heights map.
|
||||
fn get_min_height(map: &HashMap<String, Height>) -> Option<Height> {
|
||||
map.values().cloned().min()
|
||||
}
|
||||
|
|
|
@ -19,6 +19,10 @@ pub use db::{SaplingScannedResult, SaplingScanningKey};
|
|||
|
||||
use self::db::ScannerWriteBatch;
|
||||
|
||||
/// We insert an empty results entry to the database every this interval for each stored key,
|
||||
/// so we can track progress.
|
||||
const INSERT_CONTROL_INTERVAL: u32 = 1_000;
|
||||
|
||||
/// Store key info and results of the scan.
|
||||
///
|
||||
/// `rocksdb` allows concurrent writes through a shared reference,
|
||||
|
@ -87,16 +91,14 @@ impl Storage {
|
|||
self.write_batch(batch);
|
||||
}
|
||||
|
||||
/// Returns all the keys and their birthdays.
|
||||
///
|
||||
/// Birthdays are adjusted to sapling activation if they are too low or missing.
|
||||
/// Returns all the keys and their last scanned heights.
|
||||
///
|
||||
/// # Performance / Hangs
|
||||
///
|
||||
/// This method can block while reading database files, so it must be inside spawn_blocking()
|
||||
/// in async code.
|
||||
pub fn sapling_keys(&self) -> HashMap<SaplingScanningKey, Height> {
|
||||
self.sapling_keys_and_birthday_heights()
|
||||
pub fn sapling_keys_last_heights(&self) -> HashMap<SaplingScanningKey, Height> {
|
||||
self.sapling_keys_and_last_scanned_heights()
|
||||
}
|
||||
|
||||
/// Add the sapling results for `height` to the storage. The results can be any map of
|
||||
|
@ -116,6 +118,10 @@ impl Storage {
|
|||
// in a single batch.
|
||||
let mut batch = ScannerWriteBatch::default();
|
||||
|
||||
// Every `INSERT_CONTROL_INTERVAL` we add a new entry to the scanner database for each key
|
||||
// so we can track progress made in the last interval even if no transaction was yet found.
|
||||
let is_control_time = height.0 % INSERT_CONTROL_INTERVAL == 0 && sapling_results.is_empty();
|
||||
|
||||
for (index, sapling_result) in sapling_results {
|
||||
let index = SaplingScannedDatabaseIndex {
|
||||
sapling_key: sapling_key.clone(),
|
||||
|
@ -130,6 +136,11 @@ impl Storage {
|
|||
batch.insert_sapling_result(self, entry);
|
||||
}
|
||||
|
||||
// Add tracking entry for key.
|
||||
if is_control_time {
|
||||
batch.insert_sapling_height(self, sapling_key, height);
|
||||
}
|
||||
|
||||
self.write_batch(batch);
|
||||
}
|
||||
|
||||
|
|
|
@ -81,7 +81,17 @@ impl Storage {
|
|||
|
||||
let new_storage = Self { db };
|
||||
|
||||
// TODO: report the last scanned height here?
|
||||
// Report where we are for each key in the database.
|
||||
let keys = new_storage.sapling_keys_last_heights();
|
||||
for (key_num, (_key, height)) in keys.iter().enumerate() {
|
||||
tracing::info!(
|
||||
"Last scanned height for key number {} is {}, resuming at {}",
|
||||
key_num,
|
||||
height.as_usize(),
|
||||
height.next().expect("height is not maximum").as_usize(),
|
||||
);
|
||||
}
|
||||
|
||||
tracing::info!("loaded Zebra scanner cache");
|
||||
|
||||
new_storage
|
||||
|
|
|
@ -97,50 +97,37 @@ impl Storage {
|
|||
.collect()
|
||||
}
|
||||
|
||||
/// Returns all the keys and their birthday heights.
|
||||
pub fn sapling_keys_and_birthday_heights(&self) -> HashMap<SaplingScanningKey, Height> {
|
||||
// This code is a bit complex because we don't have a separate column family for keys
|
||||
// and their birthday heights.
|
||||
//
|
||||
// TODO: make a separate column family after the MVP.
|
||||
|
||||
/// Returns all the keys and their last scanned heights.
|
||||
pub fn sapling_keys_and_last_scanned_heights(&self) -> HashMap<SaplingScanningKey, Height> {
|
||||
let sapling_tx_ids = self.sapling_tx_ids_cf();
|
||||
let mut keys = HashMap::new();
|
||||
|
||||
// The minimum key is invalid or a dummy key, so we will never have an entry for it.
|
||||
let mut find_next_key_index = SaplingScannedDatabaseIndex::min();
|
||||
let mut last_stored_record: Option<(
|
||||
SaplingScannedDatabaseIndex,
|
||||
Option<SaplingScannedResult>,
|
||||
)> = self.db.zs_last_key_value(&sapling_tx_ids);
|
||||
|
||||
loop {
|
||||
// Find the next key, and the first height we have for it.
|
||||
let Some(entry) = self
|
||||
.db
|
||||
.zs_next_key_value_from(&sapling_tx_ids, &find_next_key_index)
|
||||
else {
|
||||
break;
|
||||
let Some((mut last_stored_record_index, _result)) = last_stored_record else {
|
||||
return keys;
|
||||
};
|
||||
|
||||
let sapling_key = entry.0.sapling_key;
|
||||
let mut height = entry.0.tx_loc.height;
|
||||
let _first_result: Option<SaplingScannedResult> = entry.1;
|
||||
let sapling_key = last_stored_record_index.sapling_key.clone();
|
||||
let height = last_stored_record_index.tx_loc.height;
|
||||
|
||||
let height_results = self.sapling_results_for_key_and_height(&sapling_key, height);
|
||||
let prev_height = keys.insert(sapling_key.clone(), height);
|
||||
assert_eq!(
|
||||
prev_height, None,
|
||||
"unexpected duplicate key: keys must only be inserted once\
|
||||
last_stored_record_index: {last_stored_record_index:?}",
|
||||
);
|
||||
|
||||
// If there are no results for this block, then it's a "skip up to height" marker, and
|
||||
// the birthday height is the next height. If there are some results, it's the actual
|
||||
// birthday height.
|
||||
if height_results.values().all(Option::is_none) {
|
||||
height = height
|
||||
.next()
|
||||
.expect("results should only be stored for validated block heights");
|
||||
}
|
||||
|
||||
keys.insert(sapling_key.clone(), height);
|
||||
|
||||
// Skip all the results before the next key.
|
||||
find_next_key_index = SaplingScannedDatabaseIndex::max_for_key(&sapling_key);
|
||||
// Skip all the results until the next key.
|
||||
last_stored_record_index = SaplingScannedDatabaseIndex::min_for_key(&sapling_key);
|
||||
last_stored_record = self
|
||||
.db
|
||||
.zs_prev_key_value_strictly_before(&sapling_tx_ids, &last_stored_record_index);
|
||||
}
|
||||
|
||||
keys
|
||||
}
|
||||
|
||||
/// Returns the Sapling indexes and results in the supplied range.
|
||||
|
@ -216,4 +203,15 @@ impl ScannerWriteBatch {
|
|||
SaplingScannedDatabaseIndex::min_for_key_and_height(sapling_key, skip_up_to_height);
|
||||
self.zs_insert(&storage.sapling_tx_ids_cf(), index, None);
|
||||
}
|
||||
|
||||
/// Insert sapling height with no results
|
||||
pub(crate) fn insert_sapling_height(
|
||||
&mut self,
|
||||
storage: &Storage,
|
||||
sapling_key: &SaplingScanningKey,
|
||||
height: Height,
|
||||
) {
|
||||
let index = SaplingScannedDatabaseIndex::min_for_key_and_height(sapling_key, height);
|
||||
self.zs_insert(&storage.sapling_tx_ids_cf(), index, None);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -147,30 +147,26 @@ fn snapshot_raw_rocksdb_column_family_data(db: &ScannerDb, original_cf_names: &[
|
|||
/// Snapshot typed scanner result data using high-level storage methods,
|
||||
/// using `cargo insta` and RON serialization.
|
||||
fn snapshot_typed_result_data(storage: &Storage) {
|
||||
// TODO: snapshot the latest scanned heights after PR #8080 merges
|
||||
//insta::assert_ron_snapshot!("latest_heights", latest_scanned_heights);
|
||||
|
||||
// Make sure the typed key format doesn't accidentally change.
|
||||
//
|
||||
// TODO: update this after PR #8080
|
||||
let sapling_keys_and_birthday_heights = storage.sapling_keys();
|
||||
let sapling_keys_last_heights = storage.sapling_keys_last_heights();
|
||||
|
||||
// HashMap has an unstable order across Rust releases, so we need to sort it here.
|
||||
insta::assert_ron_snapshot!(
|
||||
"sapling_keys",
|
||||
sapling_keys_and_birthday_heights,
|
||||
sapling_keys_last_heights,
|
||||
{
|
||||
"." => insta::sorted_redaction()
|
||||
}
|
||||
);
|
||||
|
||||
// HashMap has an unstable order across Rust releases, so we need to sort it here as well.
|
||||
for (key_index, (sapling_key, _birthday_height)) in sapling_keys_and_birthday_heights
|
||||
.iter()
|
||||
.sorted()
|
||||
.enumerate()
|
||||
for (key_index, (sapling_key, last_height)) in
|
||||
sapling_keys_last_heights.iter().sorted().enumerate()
|
||||
{
|
||||
let sapling_results = storage.sapling_results(sapling_key);
|
||||
|
||||
assert_eq!(sapling_results.keys().max(), Some(last_height));
|
||||
|
||||
// Check internal database method consistency
|
||||
for (height, results) in sapling_results.iter() {
|
||||
let sapling_index_and_results =
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
---
|
||||
source: zebra-scan/src/storage/db/tests/snapshot.rs
|
||||
expression: sapling_keys_and_birthday_heights
|
||||
expression: sapling_keys_last_heights
|
||||
---
|
||||
{
|
||||
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(0),
|
||||
"zxviewsfake": Height(1000000),
|
||||
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(419199),
|
||||
"zxviewsfake": Height(999999),
|
||||
}
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
---
|
||||
source: zebra-scan/src/storage/db/tests/snapshot.rs
|
||||
expression: sapling_keys_and_birthday_heights
|
||||
expression: sapling_keys_last_heights
|
||||
---
|
||||
{
|
||||
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(0),
|
||||
"zxviewsfake": Height(1000000),
|
||||
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(419199),
|
||||
"zxviewsfake": Height(999999),
|
||||
}
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
---
|
||||
source: zebra-scan/src/storage/db/tests/snapshot.rs
|
||||
expression: sapling_keys_and_birthday_heights
|
||||
expression: sapling_keys_last_heights
|
||||
---
|
||||
{
|
||||
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(0),
|
||||
"zxviewsfake": Height(1000000),
|
||||
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(419199),
|
||||
"zxviewsfake": Height(999999),
|
||||
}
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
---
|
||||
source: zebra-scan/src/storage/db/tests/snapshot.rs
|
||||
expression: sapling_keys_and_birthday_heights
|
||||
expression: sapling_keys_last_heights
|
||||
---
|
||||
{
|
||||
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(419200),
|
||||
"zxviewsfake": Height(1000000),
|
||||
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(419199),
|
||||
"zxviewsfake": Height(999999),
|
||||
}
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
---
|
||||
source: zebra-scan/src/storage/db/tests/snapshot.rs
|
||||
expression: sapling_keys_and_birthday_heights
|
||||
expression: sapling_keys_last_heights
|
||||
---
|
||||
{
|
||||
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(0),
|
||||
"zxviewsfake": Height(1000000),
|
||||
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(279999),
|
||||
"zxviewsfake": Height(999999),
|
||||
}
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
---
|
||||
source: zebra-scan/src/storage/db/tests/snapshot.rs
|
||||
expression: sapling_keys_and_birthday_heights
|
||||
expression: sapling_keys_last_heights
|
||||
---
|
||||
{
|
||||
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(0),
|
||||
"zxviewsfake": Height(1000000),
|
||||
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(279999),
|
||||
"zxviewsfake": Height(999999),
|
||||
}
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
---
|
||||
source: zebra-scan/src/storage/db/tests/snapshot.rs
|
||||
expression: sapling_keys_and_birthday_heights
|
||||
expression: sapling_keys_last_heights
|
||||
---
|
||||
{
|
||||
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(0),
|
||||
"zxviewsfake": Height(1000000),
|
||||
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(279999),
|
||||
"zxviewsfake": Height(999999),
|
||||
}
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
---
|
||||
source: zebra-scan/src/storage/db/tests/snapshot.rs
|
||||
expression: sapling_keys_and_birthday_heights
|
||||
expression: sapling_keys_last_heights
|
||||
---
|
||||
{
|
||||
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(280000),
|
||||
"zxviewsfake": Height(1000000),
|
||||
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(279999),
|
||||
"zxviewsfake": Height(999999),
|
||||
}
|
||||
|
|
|
@ -163,10 +163,14 @@ fn scanning_fake_blocks_store_key_and_results() -> Result<()> {
|
|||
s.add_sapling_key(&key_to_be_stored, None);
|
||||
|
||||
// Check key was added
|
||||
assert_eq!(s.sapling_keys().len(), 1);
|
||||
assert_eq!(s.sapling_keys_last_heights().len(), 1);
|
||||
assert_eq!(
|
||||
s.sapling_keys().get(&key_to_be_stored),
|
||||
Some(&s.min_sapling_birthday_height())
|
||||
s.sapling_keys_last_heights()
|
||||
.get(&key_to_be_stored)
|
||||
.expect("height is stored")
|
||||
.next()
|
||||
.expect("height is not maximum"),
|
||||
s.min_sapling_birthday_height()
|
||||
);
|
||||
|
||||
let nf = Nullifier([7; 32]);
|
||||
|
|
|
@ -220,6 +220,16 @@ pub trait ReadDisk {
|
|||
K: IntoDisk + FromDisk,
|
||||
V: FromDisk;
|
||||
|
||||
/// Returns the first key strictly greater than `lower_bound` in `cf`,
|
||||
/// and the corresponding value.
|
||||
///
|
||||
/// Returns `None` if there are no keys greater than `lower_bound`.
|
||||
fn zs_next_key_value_strictly_after<C, K, V>(&self, cf: &C, lower_bound: &K) -> Option<(K, V)>
|
||||
where
|
||||
C: rocksdb::AsColumnFamilyRef,
|
||||
K: IntoDisk + FromDisk,
|
||||
V: FromDisk;
|
||||
|
||||
/// Returns the first key less than or equal to `upper_bound` in `cf`,
|
||||
/// and the corresponding value.
|
||||
///
|
||||
|
@ -230,6 +240,16 @@ pub trait ReadDisk {
|
|||
K: IntoDisk + FromDisk,
|
||||
V: FromDisk;
|
||||
|
||||
/// Returns the first key strictly less than `upper_bound` in `cf`,
|
||||
/// and the corresponding value.
|
||||
///
|
||||
/// Returns `None` if there are no keys less than `upper_bound`.
|
||||
fn zs_prev_key_value_strictly_before<C, K, V>(&self, cf: &C, upper_bound: &K) -> Option<(K, V)>
|
||||
where
|
||||
C: rocksdb::AsColumnFamilyRef,
|
||||
K: IntoDisk + FromDisk,
|
||||
V: FromDisk;
|
||||
|
||||
/// Returns the keys and values in `cf` in `range`, in an ordered `BTreeMap`.
|
||||
///
|
||||
/// Holding this iterator open might delay block commit transactions.
|
||||
|
@ -321,7 +341,6 @@ impl ReadDisk for DiskDb {
|
|||
.is_some()
|
||||
}
|
||||
|
||||
#[allow(clippy::unwrap_in_result)]
|
||||
fn zs_first_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
|
||||
where
|
||||
C: rocksdb::AsColumnFamilyRef,
|
||||
|
@ -332,7 +351,6 @@ impl ReadDisk for DiskDb {
|
|||
self.zs_forward_range_iter(cf, ..).next()
|
||||
}
|
||||
|
||||
#[allow(clippy::unwrap_in_result)]
|
||||
fn zs_last_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
|
||||
where
|
||||
C: rocksdb::AsColumnFamilyRef,
|
||||
|
@ -343,28 +361,46 @@ impl ReadDisk for DiskDb {
|
|||
self.zs_reverse_range_iter(cf, ..).next()
|
||||
}
|
||||
|
||||
#[allow(clippy::unwrap_in_result)]
|
||||
fn zs_next_key_value_from<C, K, V>(&self, cf: &C, lower_bound: &K) -> Option<(K, V)>
|
||||
where
|
||||
C: rocksdb::AsColumnFamilyRef,
|
||||
K: IntoDisk + FromDisk,
|
||||
V: FromDisk,
|
||||
{
|
||||
// Reading individual values from iterators does not seem to cause database hangs.
|
||||
self.zs_forward_range_iter(cf, lower_bound..).next()
|
||||
}
|
||||
|
||||
#[allow(clippy::unwrap_in_result)]
|
||||
fn zs_next_key_value_strictly_after<C, K, V>(&self, cf: &C, lower_bound: &K) -> Option<(K, V)>
|
||||
where
|
||||
C: rocksdb::AsColumnFamilyRef,
|
||||
K: IntoDisk + FromDisk,
|
||||
V: FromDisk,
|
||||
{
|
||||
use std::ops::Bound::*;
|
||||
|
||||
// There is no standard syntax for an excluded start bound.
|
||||
self.zs_forward_range_iter(cf, (Excluded(lower_bound), Unbounded))
|
||||
.next()
|
||||
}
|
||||
|
||||
fn zs_prev_key_value_back_from<C, K, V>(&self, cf: &C, upper_bound: &K) -> Option<(K, V)>
|
||||
where
|
||||
C: rocksdb::AsColumnFamilyRef,
|
||||
K: IntoDisk + FromDisk,
|
||||
V: FromDisk,
|
||||
{
|
||||
// Reading individual values from iterators does not seem to cause database hangs.
|
||||
self.zs_reverse_range_iter(cf, ..=upper_bound).next()
|
||||
}
|
||||
|
||||
fn zs_prev_key_value_strictly_before<C, K, V>(&self, cf: &C, upper_bound: &K) -> Option<(K, V)>
|
||||
where
|
||||
C: rocksdb::AsColumnFamilyRef,
|
||||
K: IntoDisk + FromDisk,
|
||||
V: FromDisk,
|
||||
{
|
||||
self.zs_reverse_range_iter(cf, ..upper_bound).next()
|
||||
}
|
||||
|
||||
fn zs_items_in_range_ordered<C, K, V, R>(&self, cf: &C, range: R) -> BTreeMap<K, V>
|
||||
where
|
||||
C: rocksdb::AsColumnFamilyRef,
|
||||
|
|
|
@ -2860,3 +2860,100 @@ fn scan_task_starts() -> Result<()> {
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Test that the scanner can continue scanning where it was left when zebrad restarts.
|
||||
///
|
||||
/// Needs a cache state close to the tip. A possible way to run it locally is:
|
||||
///
|
||||
/// export ZEBRA_CACHED_STATE_DIR="/path/to/zebra/state"
|
||||
/// cargo test scan_start_where_left --features="shielded-scan" -- --ignored --nocapture
|
||||
///
|
||||
/// The test will run zebrad with a key to scan, scan the first few blocks after sapling and then stops.
|
||||
/// Then it will restart zebrad and check that it resumes scanning where it was left.
|
||||
///
|
||||
/// Note: This test will remove all the contents you may have in the ZEBRA_CACHED_STATE_DIR/private-scan directory
|
||||
/// so it can start with an empty scanning state.
|
||||
#[ignore]
|
||||
#[test]
|
||||
#[cfg(feature = "shielded-scan")]
|
||||
fn scan_start_where_left() -> Result<()> {
|
||||
use indexmap::IndexMap;
|
||||
use zebra_scan::storage::db::SCANNER_DATABASE_KIND;
|
||||
|
||||
let _init_guard = zebra_test::init();
|
||||
|
||||
// use `UpdateZebraCachedStateNoRpc` as the test type to make sure a zebrad cache state is available.
|
||||
let test_type = TestType::UpdateZebraCachedStateNoRpc;
|
||||
if let Some(cache_dir) = test_type.zebrad_state_path("scan test") {
|
||||
// Add a key to the config
|
||||
const ZECPAGES_VIEWING_KEY: &str = "zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz";
|
||||
let mut config = default_test_config(Mainnet)?;
|
||||
let mut keys = IndexMap::new();
|
||||
keys.insert(ZECPAGES_VIEWING_KEY.to_string(), 1);
|
||||
config.shielded_scan.sapling_keys_to_scan = keys;
|
||||
|
||||
// Add the cache dir to shielded scan, make it the same as the zebrad cache state.
|
||||
config.shielded_scan.db_config_mut().cache_dir = cache_dir.clone();
|
||||
config.shielded_scan.db_config_mut().ephemeral = false;
|
||||
|
||||
// Add the cache dir to state.
|
||||
config.state.cache_dir = cache_dir.clone();
|
||||
config.state.ephemeral = false;
|
||||
|
||||
// Remove the scan directory before starting.
|
||||
let scan_db_path = cache_dir.join(SCANNER_DATABASE_KIND);
|
||||
fs::remove_dir_all(std::path::Path::new(&scan_db_path)).ok();
|
||||
|
||||
// Start zebra with the config.
|
||||
let mut zebrad = testdir()?
|
||||
.with_exact_config(&config)?
|
||||
.spawn_child(args!["start"])?
|
||||
.with_timeout(test_type.zebrad_timeout());
|
||||
|
||||
// Check scanner was started.
|
||||
zebrad.expect_stdout_line_matches("loaded Zebra scanner cache")?;
|
||||
|
||||
// The first time
|
||||
zebrad.expect_stdout_line_matches(
|
||||
r"Scanning the blockchain for key 0, started at block 419200, now at block 420000",
|
||||
)?;
|
||||
|
||||
// Make sure scanner scans a few blocks.
|
||||
zebrad.expect_stdout_line_matches(
|
||||
r"Scanning the blockchain for key 0, started at block 419200, now at block 430000",
|
||||
)?;
|
||||
zebrad.expect_stdout_line_matches(
|
||||
r"Scanning the blockchain for key 0, started at block 419200, now at block 440000",
|
||||
)?;
|
||||
|
||||
// Kill the node.
|
||||
zebrad.kill(false)?;
|
||||
let output = zebrad.wait_with_output()?;
|
||||
|
||||
// Make sure the command was killed
|
||||
output.assert_was_killed()?;
|
||||
output.assert_failure()?;
|
||||
|
||||
// Start the node again.
|
||||
let mut zebrad = testdir()?
|
||||
.with_exact_config(&config)?
|
||||
.spawn_child(args!["start"])?
|
||||
.with_timeout(test_type.zebrad_timeout());
|
||||
|
||||
// Resuming message.
|
||||
zebrad.expect_stdout_line_matches(
|
||||
"Last scanned height for key number 0 is 439000, resuming at 439001",
|
||||
)?;
|
||||
zebrad.expect_stdout_line_matches("loaded Zebra scanner cache")?;
|
||||
|
||||
// Start scanning where it was left.
|
||||
zebrad.expect_stdout_line_matches(
|
||||
r"Scanning the blockchain for key 0, started at block 439001, now at block 440000",
|
||||
)?;
|
||||
zebrad.expect_stdout_line_matches(
|
||||
r"Scanning the blockchain for key 0, started at block 439001, now at block 450000",
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue