change(scan): Create a function that scans one block by height, and stores the results in the database (#8045)

* Update multiple key docs

* Add a TODO for the tree size

* Fix scan() docs

* Split out a scan_height...() function

* Clarify it's the state database we're using for scanning heights

Co-authored-by: Marek <mail@marek.onl>

* Remove unused import

---------

Co-authored-by: Marek <mail@marek.onl>
This commit is contained in:
teor 2023-12-06 11:57:29 +10:00 committed by GitHub
parent 7c6a0f8388
commit 358e52bc64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 140 additions and 91 deletions

View File

@ -22,8 +22,12 @@ use zcash_primitives::{
}; };
use zebra_chain::{ use zebra_chain::{
block::Block, chain_tip::ChainTip, diagnostic::task::WaitForPanics, parameters::Network, block::{Block, Height},
serialization::ZcashSerialize, transaction::Transaction, chain_tip::ChainTip,
diagnostic::task::WaitForPanics,
parameters::Network,
serialization::ZcashSerialize,
transaction::Transaction,
}; };
use zebra_state::{ChainTipChange, SaplingScannedResult}; use zebra_state::{ChainTipChange, SaplingScannedResult};
@ -51,7 +55,7 @@ const INFO_LOG_INTERVAL: u32 = 100_000;
/// Start a scan task that reads blocks from `state`, scans them with the configured keys in /// Start a scan task that reads blocks from `state`, scans them with the configured keys in
/// `storage`, and then writes the results to `storage`. /// `storage`, and then writes the results to `storage`.
pub async fn start( pub async fn start(
mut state: State, state: State,
chain_tip_change: ChainTipChange, chain_tip_change: ChainTipChange,
storage: Storage, storage: Storage,
) -> Result<(), Report> { ) -> Result<(), Report> {
@ -63,6 +67,7 @@ pub async fn start(
let key_birthdays = tokio::task::spawn_blocking(move || key_storage.sapling_keys()) let key_birthdays = tokio::task::spawn_blocking(move || key_storage.sapling_keys())
.wait_for_panics() .wait_for_panics()
.await; .await;
let key_birthdays = Arc::new(key_birthdays);
// Parse and convert keys once, then use them to scan all blocks. // Parse and convert keys once, then use them to scan all blocks.
// There is some cryptography here, but it should be fast even with thousands of keys. // There is some cryptography here, but it should be fast even with thousands of keys.
@ -76,32 +81,56 @@ pub async fn start(
Ok::<_, Report>((key.clone(), parsed_keys)) Ok::<_, Report>((key.clone(), parsed_keys))
}) })
.try_collect()?; .try_collect()?;
let parsed_keys = Arc::new(parsed_keys);
// Give empty states time to verify some blocks before we start scanning. // Give empty states time to verify some blocks before we start scanning.
tokio::time::sleep(INITIAL_WAIT).await; tokio::time::sleep(INITIAL_WAIT).await;
loop { loop {
// Get a block from the state. let scanned_height = scan_height_and_store_results(
// We can't use ServiceExt::oneshot() here, because it causes lifetime errors in init(). height,
let block = state state.clone(),
.ready() chain_tip_change.clone(),
.await storage.clone(),
.map_err(|e| eyre!(e))? key_birthdays.clone(),
.call(zebra_state::Request::Block(height.into())) parsed_keys.clone(),
.await )
.map_err(|e| eyre!(e))?; .await?;
let block = match block {
zebra_state::Response::Block(Some(block)) => block,
zebra_state::Response::Block(None) => {
// If we've reached the tip, sleep for a while then try and get the same block. // If we've reached the tip, sleep for a while then try and get the same block.
if scanned_height.is_none() {
tokio::time::sleep(CHECK_INTERVAL).await; tokio::time::sleep(CHECK_INTERVAL).await;
continue; continue;
} }
_ => unreachable!("unmatched response to a state::Tip request"),
};
// Only log at info level every 100,000 blocks height = height
.next()
.expect("a valid blockchain never reaches the max height");
}
}
/// Get the block at `height` from `state`, scan it with the keys in `parsed_keys`, and store the
/// results in `storage`. If `height` is lower than the `key_birthdays` for that key, skip it.
///
/// Returns:
/// - `Ok(Some(height))` if the height was scanned,
/// - `Ok(None)` if the height was not in the state, and
/// - `Err(error)` on fatal errors.
pub async fn scan_height_and_store_results(
height: Height,
mut state: State,
chain_tip_change: ChainTipChange,
storage: Storage,
key_birthdays: Arc<HashMap<SaplingScanningKey, Height>>,
parsed_keys: Arc<
HashMap<SaplingScanningKey, (Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>)>,
>,
) -> Result<Option<Height>, Report> {
let network = storage.network();
// Only log at info level every 100,000 blocks.
//
// TODO: also log progress every 5 minutes once we reach the tip?
let is_info_log = let is_info_log =
height == storage.min_sapling_birthday_height() || height.0 % INFO_LOG_INTERVAL == 0; height == storage.min_sapling_birthday_height() || height.0 % INFO_LOG_INTERVAL == 0;
@ -116,6 +145,25 @@ pub async fn start(
); );
} }
// Get a block from the state.
// We can't use ServiceExt::oneshot() here, because it causes lifetime errors in init().
let block = state
.ready()
.await
.map_err(|e| eyre!(e))?
.call(zebra_state::Request::Block(height.into()))
.await
.map_err(|e| eyre!(e))?;
let block = match block {
zebra_state::Response::Block(Some(block)) => block,
zebra_state::Response::Block(None) => return Ok(None),
_ => unreachable!("unmatched response to a state::Tip request"),
};
// Scan it with all the keys.
//
// TODO: scan each key in parallel (after MVP?)
for (key_num, (sapling_key, birthday_height)) in key_birthdays.iter().enumerate() { for (key_num, (sapling_key, birthday_height)) in key_birthdays.iter().enumerate() {
// # Security // # Security
// //
@ -134,7 +182,6 @@ pub async fn start(
// Scan the block, which blocks async execution until the scan is complete. // Scan the block, which blocks async execution until the scan is complete.
// //
// TODO: skip scanning before birthday height (#8022) // TODO: skip scanning before birthday height (#8022)
// TODO: scan each key in parallel (after MVP?)
let sapling_key = sapling_key.clone(); let sapling_key = sapling_key.clone();
let block = block.clone(); let block = block.clone();
let mut storage = storage.clone(); let mut storage = storage.clone();
@ -147,6 +194,8 @@ pub async fn start(
// And we can't set them close to 0, because the scanner subtracts the number of notes // And we can't set them close to 0, because the scanner subtracts the number of notes
// in the block, and panics with "attempt to subtract with overflow". The number of // in the block, and panics with "attempt to subtract with overflow". The number of
// notes in a block must be less than this value, this is a consensus rule. // notes in a block must be less than this value, this is a consensus rule.
//
// TODO: use the real sapling tree size: `zs::Response::SaplingTree().position() + 1`
let sapling_tree_size = 1 << 16; let sapling_tree_size = 1 << 16;
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
@ -167,14 +216,15 @@ pub async fn start(
.await?; .await?;
} }
height = height Ok(Some(height))
.next()
.expect("a valid blockchain never reaches the max height");
}
} }
/// Returns transactions belonging to the given `ScanningKey`. This list of keys should come from /// Returns the transactions from `block` belonging to the given `scanning_keys`.
/// a single configured `SaplingScanningKey`. /// This list of keys should come from a single configured `SaplingScanningKey`.
///
/// For example, there are two individual viewing keys for most shielded transfers:
/// - the payment (external) key, and
/// - the change (internal) key.
/// ///
/// # Performance / Hangs /// # Performance / Hangs
/// ///
@ -182,9 +232,8 @@ pub async fn start(
/// in async code. /// in async code.
/// ///
/// TODO: /// TODO:
/// - Remove the `sapling_tree_size` parameter or turn it into an `Option` once we have access to /// - Pass the real `sapling_tree_size` parameter from the state.
/// Zebra's state, and we can retrieve the tree size ourselves. /// - Add other prior block metadata.
/// - Add prior block metadata once we have access to Zebra's state.
pub fn scan_block<K: ScanningKey>( pub fn scan_block<K: ScanningKey>(
network: Network, network: Network,
block: &Block, block: &Block,