change(scan): Store one transaction ID per database row, to make queries easier (#8062)

* Upgrade the scanner database major version to 1

* Update format docs

* Change the high-level scanner db format

* Change the scanner serialization formats

* Fix value format and tests

* Fix incorrect types

* Update documentation

---------

Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>
This commit is contained in:
teor 2023-12-07 03:34:21 +10:00 committed by GitHub
parent 358e52bc64
commit 36f226362d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 245 additions and 107 deletions

View File

@ -1,6 +1,10 @@
//! The scanner task and scanning APIs.
use std::{collections::HashMap, sync::Arc, time::Duration};
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
time::Duration,
};
use color_eyre::{eyre::eyre, Report};
use itertools::Itertools;
@ -29,7 +33,7 @@ use zebra_chain::{
serialization::ZcashSerialize,
transaction::Transaction,
};
use zebra_state::{ChainTipChange, SaplingScannedResult};
use zebra_state::{ChainTipChange, SaplingScannedResult, TransactionIndex};
use crate::storage::{SaplingScanningKey, Storage};
@ -207,8 +211,8 @@ pub async fn scan_height_and_store_results(
let dfvk_res = scanned_block_to_db_result(dfvk_res);
let ivk_res = scanned_block_to_db_result(ivk_res);
storage.add_sapling_result(sapling_key.clone(), height, dfvk_res);
storage.add_sapling_result(sapling_key, height, ivk_res);
storage.add_sapling_results(sapling_key.clone(), height, dfvk_res);
storage.add_sapling_results(sapling_key, height, ivk_res);
Ok::<_, Report>(())
})
@ -385,10 +389,17 @@ fn transaction_to_compact((index, tx): (usize, Arc<Transaction>)) -> CompactTx {
}
/// Convert a scanned block to a list of scanner database results.
fn scanned_block_to_db_result<Nf>(scanned_block: ScannedBlock<Nf>) -> Vec<SaplingScannedResult> {
fn scanned_block_to_db_result<Nf>(
scanned_block: ScannedBlock<Nf>,
) -> BTreeMap<TransactionIndex, SaplingScannedResult> {
scanned_block
.transactions()
.iter()
.map(|tx| SaplingScannedResult::from(tx.txid.as_ref()))
.map(|tx| {
(
TransactionIndex::from_usize(tx.index),
SaplingScannedResult::from(tx.txid.as_ref()),
)
})
.collect()
}

View File

@ -6,7 +6,9 @@ use zebra_chain::{
block::Height,
parameters::{Network, NetworkUpgrade},
};
use zebra_state::{SaplingScannedDatabaseEntry, SaplingScannedDatabaseIndex};
use zebra_state::{
SaplingScannedDatabaseEntry, SaplingScannedDatabaseIndex, TransactionIndex, TransactionLocation,
};
use crate::config::Config;
@ -56,8 +58,8 @@ impl Storage {
pub fn new(config: &Config, network: Network) -> Self {
let mut storage = Self::new_db(config, network);
for (key, birthday) in config.sapling_keys_to_scan.iter() {
storage.add_sapling_key(key.clone(), Some(zebra_chain::block::Height(*birthday)));
for (sapling_key, birthday) in config.sapling_keys_to_scan.iter() {
storage.add_sapling_key(sapling_key, Some(zebra_chain::block::Height(*birthday)));
}
storage
@ -69,12 +71,12 @@ impl Storage {
///
/// This method can block while writing database files, so it must be inside spawn_blocking()
/// in async code.
pub fn add_sapling_key(&mut self, key: SaplingScanningKey, birthday: Option<Height>) {
pub fn add_sapling_key(&mut self, sapling_key: &SaplingScanningKey, birthday: Option<Height>) {
// It's ok to write some keys and not others during shutdown, so each key can get its own
// batch. (They will be re-written on startup anyway.)
let mut batch = ScannerWriteBatch::default();
batch.insert_sapling_key(self, key, birthday);
batch.insert_sapling_key(self, sapling_key, birthday);
self.write_batch(batch);
}
@ -91,33 +93,35 @@ impl Storage {
self.sapling_keys_and_birthday_heights()
}
/// Add a sapling result to the storage.
/// Add the sapling results for `height` to the storage.
///
/// # Performance / Hangs
///
/// This method can block while writing database files, so it must be inside spawn_blocking()
/// in async code.
pub fn add_sapling_result(
pub fn add_sapling_results(
&mut self,
sapling_key: SaplingScanningKey,
height: Height,
sapling_result: Vec<SaplingScannedResult>,
sapling_results: BTreeMap<TransactionIndex, SaplingScannedResult>,
) {
// It's ok to write some results and not others during shutdown, so each result can get its
// own batch. (They will be re-scanned on startup anyway.)
// We skip heights that have one or more results, so the results for each height must be
// in a single batch.
let mut batch = ScannerWriteBatch::default();
let index = SaplingScannedDatabaseIndex {
sapling_key,
height,
};
for (index, sapling_result) in sapling_results {
let index = SaplingScannedDatabaseIndex {
sapling_key: sapling_key.clone(),
tx_loc: TransactionLocation::from_parts(height, index),
};
let entry = SaplingScannedDatabaseEntry {
index,
value: sapling_result,
};
let entry = SaplingScannedDatabaseEntry {
index,
value: Some(sapling_result),
};
batch.insert_sapling_result(self, entry);
batch.insert_sapling_result(self, entry);
}
self.write_batch(batch);
}

View File

@ -35,6 +35,10 @@ pub const SCANNER_COLUMN_FAMILIES_IN_CODE: &[&str] = &[
// TODO: add Orchard support
];
/// The major version number of the scanner database. This must be updated whenever the database
/// format changes.
const SCANNER_DATABASE_FORMAT_MAJOR_VERSION: u64 = 1;
impl Storage {
// Creation
@ -96,8 +100,8 @@ impl Storage {
/// The database format version in the running scanner code.
pub fn database_format_version_in_code() -> Version {
// TODO: implement scanner database versioning
Version::new(0, 0, 0)
// TODO: implement in-place scanner database format upgrades
Version::new(SCANNER_DATABASE_FORMAT_MAJOR_VERSION, 0, 0)
}
/// Check for panics in code running in spawned threads.

View File

@ -4,24 +4,34 @@
//!
//! | name | key | value |
//! |------------------|-------------------------------|--------------------------|
//! | `sapling_tx_ids` | `SaplingScannedDatabaseIndex` | `Vec<transaction::Hash>` |
//! | `sapling_tx_ids` | `SaplingScannedDatabaseIndex` | `Option<SaplingScannedResult>` |
//!
//! And types:
//! SaplingScannedDatabaseIndex = `SaplingScanningKey` | `Height`
//! `SaplingScannedResult`: same as `transaction::Hash`, but with bytes in display order.
//! `None` is stored as a zero-length array of bytes.
//!
//! `SaplingScannedDatabaseIndex` = `SaplingScanningKey` | `TransactionLocation`
//! `TransactionLocation` = `Height` | `TransactionIndex`
//!
//! This format allows us to efficiently find all the results for each key, and the latest height
//! for each key.
//!
//! If there are no results for a height, we store an empty list of results. This allows is to scan
//! each key from the next height after we restart. We also use this mechanism to store key
//! birthday heights, by storing the height before the birthday as the "last scanned" block.
//! If there are no results for a height, we store `None` as the result for the coinbase
//! transaction. This allows is to scan each key from the next height after we restart. We also use
//! this mechanism to store key birthday heights, by storing the height before the birthday as the
//! "last scanned" block.
use std::collections::{BTreeMap, HashMap};
use std::{
collections::{BTreeMap, HashMap},
ops::RangeBounds,
};
use itertools::Itertools;
use zebra_chain::block::Height;
use zebra_state::{
AsColumnFamilyRef, ReadDisk, SaplingScannedDatabaseEntry, SaplingScannedDatabaseIndex,
SaplingScannedResult, SaplingScanningKey, WriteDisk,
SaplingScannedResult, SaplingScanningKey, TransactionIndex, WriteDisk,
};
use crate::storage::Storage;
@ -36,16 +46,29 @@ pub const SAPLING_TX_IDS: &str = "sapling_tx_ids";
impl Storage {
// Reading Sapling database entries
/// Returns the results for a specific key and block height.
/// Returns the result for a specific database index (key, block height, transaction index).
//
// TODO: add tests for this method
pub fn sapling_result_for_key_and_block(
pub fn sapling_result_for_index(
&self,
index: &SaplingScannedDatabaseIndex,
) -> Vec<SaplingScannedResult> {
self.db
.zs_get(&self.sapling_tx_ids_cf(), &index)
.unwrap_or_default()
) -> Option<SaplingScannedResult> {
self.db.zs_get(&self.sapling_tx_ids_cf(), &index)
}
/// Returns the results for a specific key and block height.
pub fn sapling_results_for_key_and_height(
&self,
sapling_key: &SaplingScanningKey,
height: Height,
) -> BTreeMap<TransactionIndex, Option<SaplingScannedResult>> {
let kh_min = SaplingScannedDatabaseIndex::min_for_key_and_height(sapling_key, height);
let kh_max = SaplingScannedDatabaseIndex::max_for_key_and_height(sapling_key, height);
self.sapling_results_in_range(kh_min..=kh_max)
.into_iter()
.map(|(result_index, txid)| (result_index.tx_loc.index, txid))
.collect()
}
/// Returns all the results for a specific key, indexed by height.
@ -56,10 +79,19 @@ impl Storage {
let k_min = SaplingScannedDatabaseIndex::min_for_key(sapling_key);
let k_max = SaplingScannedDatabaseIndex::max_for_key(sapling_key);
self.db
.zs_items_in_range_ordered(&self.sapling_tx_ids_cf(), k_min..=k_max)
// Get an iterator of individual transaction results, and turn it into a HashMap by height
let results: HashMap<Height, Vec<Option<SaplingScannedResult>>> = self
.sapling_results_in_range(k_min..=k_max)
.into_iter()
.map(|(index, result)| (index.height, result))
.map(|(index, result)| (index.tx_loc.height, result))
.into_group_map();
// But we want Vec<SaplingScannedResult>, with empty Vecs instead of [None, None, ...]
results
.into_iter()
.map(|(index, vector)| -> (Height, Vec<SaplingScannedResult>) {
(index, vector.into_iter().flatten().collect())
})
.collect()
}
@ -85,16 +117,16 @@ impl Storage {
break;
};
let (index, results): (_, Vec<SaplingScannedResult>) = entry;
let SaplingScannedDatabaseIndex {
sapling_key,
mut height,
} = index;
let sapling_key = entry.0.sapling_key;
let mut height = entry.0.tx_loc.height;
let _first_result: Option<SaplingScannedResult> = entry.1;
// If there are no results, then it's a "skip up to height" marker, and the birthday
// height is the next height. If there are some results, it's the actual birthday
// height.
if results.is_empty() {
let height_results = self.sapling_results_for_key_and_height(&sapling_key, height);
// If there are no results for this block, then it's a "skip up to height" marker, and
// the birthday height is the next height. If there are some results, it's the actual
// birthday height.
if height_results.values().all(Option::is_none) {
height = height
.next()
.expect("results should only be stored for validated block heights");
@ -109,6 +141,17 @@ impl Storage {
keys
}
/// Returns the Sapling indexes and results in the supplied range.
///
/// Convenience method for accessing raw data with the correct types.
fn sapling_results_in_range(
&self,
range: impl RangeBounds<SaplingScannedDatabaseIndex>,
) -> BTreeMap<SaplingScannedDatabaseIndex, Option<SaplingScannedResult>> {
self.db
.zs_items_in_range_ordered(&self.sapling_tx_ids_cf(), range)
}
// Column family convenience methods
/// Returns a handle to the `sapling_tx_ids` column family.
@ -122,7 +165,7 @@ impl Storage {
/// Write `batch` to the database for this storage.
pub(crate) fn write_batch(&self, batch: ScannerWriteBatch) {
// Just panic on errors for now
// Just panic on errors for now.
self.db
.write_batch(batch.0)
.expect("unexpected database error")
@ -147,12 +190,15 @@ impl ScannerWriteBatch {
/// Insert a sapling scanning `key`, and mark all heights before `birthday_height` so they
/// won't be scanned.
///
/// If a result already exists for the height before the birthday, it is replaced with an empty
/// result.
/// If a result already exists for the coinbase transaction at the height before the birthday,
/// it is replaced with an empty result. This can happen if the user increases the birthday
/// height.
///
/// TODO: ignore incorrect changes to birthday heights
pub(crate) fn insert_sapling_key(
&mut self,
storage: &Storage,
sapling_key: SaplingScanningKey,
sapling_key: &SaplingScanningKey,
birthday_height: Option<Height>,
) {
let min_birthday_height = storage.min_sapling_birthday_height();
@ -162,13 +208,10 @@ impl ScannerWriteBatch {
.unwrap_or(min_birthday_height)
.max(min_birthday_height);
// And we want to skip up to the height before it.
let skip_up_to_height = birthday_height.previous().unwrap_or(Height(0));
let skip_up_to_height = birthday_height.previous().unwrap_or(Height::MIN);
let index = SaplingScannedDatabaseIndex {
sapling_key,
height: skip_up_to_height,
};
self.zs_insert(&storage.sapling_tx_ids_cf(), index, Vec::new());
let index =
SaplingScannedDatabaseIndex::min_for_key_and_height(sapling_key, skip_up_to_height);
self.zs_insert(&storage.sapling_tx_ids_cf(), index, None);
}
}

View File

@ -46,7 +46,7 @@ use zebra_chain::{
transparent::{CoinbaseData, Input},
work::{difficulty::CompactDifficulty, equihash::Solution},
};
use zebra_state::SaplingScannedResult;
use zebra_state::{SaplingScannedResult, TransactionIndex};
use crate::{
config::Config,
@ -189,7 +189,7 @@ fn scanning_fake_blocks_store_key_and_results() -> Result<()> {
let mut s = crate::storage::Storage::new(&Config::ephemeral(), Network::Mainnet);
// Insert the generated key to the database
s.add_sapling_key(key_to_be_stored.clone(), None);
s.add_sapling_key(&key_to_be_stored, None);
// Check key was added
assert_eq!(s.sapling_keys().len(), 1);
@ -210,7 +210,11 @@ fn scanning_fake_blocks_store_key_and_results() -> Result<()> {
let result = SaplingScannedResult::from(result.transactions()[0].txid.as_ref());
// Add result to database
s.add_sapling_result(key_to_be_stored.clone(), Height(1), vec![result]);
s.add_sapling_results(
key_to_be_stored.clone(),
Height(1),
[(TransactionIndex::from_usize(0), result)].into(),
);
// Check the result was added
assert_eq!(

View File

@ -56,7 +56,7 @@ pub use service::{
chain_tip::{ChainTipChange, LatestChainTip, TipAction},
check, init, spawn_init,
watch_receiver::WatchReceiver,
OutputIndex, OutputLocation, TransactionLocation,
OutputIndex, OutputLocation, TransactionIndex, TransactionLocation,
};
#[cfg(feature = "shielded-scan")]

View File

@ -85,7 +85,7 @@ pub mod arbitrary;
#[cfg(test)]
mod tests;
pub use finalized_state::{OutputIndex, OutputLocation, TransactionLocation};
pub use finalized_state::{OutputIndex, OutputLocation, TransactionIndex, TransactionLocation};
use self::queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified, SentHashes};

View File

@ -42,7 +42,8 @@ mod tests;
pub use disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk};
#[allow(unused_imports)]
pub use disk_format::{
FromDisk, IntoDisk, OutputIndex, OutputLocation, TransactionLocation, MAX_ON_DISK_HEIGHT,
FromDisk, IntoDisk, OutputIndex, OutputLocation, TransactionIndex, TransactionLocation,
MAX_ON_DISK_HEIGHT,
};
pub use zebra_db::ZebraDb;

View File

@ -19,7 +19,7 @@ pub mod scan;
#[cfg(test)]
mod tests;
pub use block::{TransactionLocation, MAX_ON_DISK_HEIGHT};
pub use block::{TransactionIndex, TransactionLocation, MAX_ON_DISK_HEIGHT};
pub use transparent::{OutputIndex, OutputLocation};
#[cfg(feature = "shielded-scan")]

View File

@ -89,7 +89,7 @@ impl TransactionIndex {
)
}
/// Returns this index as a `usize`
/// Returns this index as a `usize`.
pub fn as_usize(&self) -> usize {
self.0.into()
}
@ -103,11 +103,21 @@ impl TransactionIndex {
)
}
/// Returns this index as a `u64`
/// Returns this index as a `u64`.
#[allow(dead_code)]
pub fn as_u64(&self) -> u64 {
self.0.into()
}
/// The minimum value of a transaction index.
///
/// This value corresponds to the coinbase transaction.
pub const MIN: Self = Self(u16::MIN);
/// The maximum value of a transaction index.
///
/// This value corresponds to the highest possible transaction index.
pub const MAX: Self = Self(u16::MAX);
}
/// A transaction's location in the chain, by block height and transaction index.
@ -127,6 +137,11 @@ pub struct TransactionLocation {
}
impl TransactionLocation {
/// Creates a transaction location from a block height and transaction index.
pub fn from_parts(height: Height, index: TransactionIndex) -> TransactionLocation {
TransactionLocation { height, index }
}
/// Creates a transaction location from a block height and transaction index.
pub fn from_index(height: Height, transaction_index: u16) -> TransactionLocation {
TransactionLocation {
@ -150,6 +165,42 @@ impl TransactionLocation {
index: TransactionIndex::from_u64(transaction_index),
}
}
/// The minimum value of a transaction location.
///
/// This value corresponds to the genesis coinbase transaction.
pub const MIN: Self = Self {
height: Height::MIN,
index: TransactionIndex::MIN,
};
/// The maximum value of a transaction location.
///
/// This value corresponds to the last transaction in the highest possible block.
pub const MAX: Self = Self {
height: Height::MAX,
index: TransactionIndex::MAX,
};
/// The minimum value of a transaction location for `height`.
///
/// This value is the coinbase transaction.
pub const fn min_for_height(height: Height) -> Self {
Self {
height,
index: TransactionIndex::MIN,
}
}
/// The maximum value of a transaction location for `height`.
///
/// This value can be a valid entry, but it won't fit in a 2MB block.
pub const fn max_for_height(height: Height) -> Self {
Self {
height,
index: TransactionIndex::MAX,
}
}
}
// Block and transaction trait impls

View File

@ -4,23 +4,14 @@
//!
//! # Correctness
//!
//! Once format versions are implemented for the scanner database,
//! `zebra_scan::Storage::database_format_version_in_code()` must be incremented
//! each time the database format (column, serialization, etc) changes.
use zebra_chain::{block::Height, transaction};
use crate::{FromDisk, IntoDisk};
use crate::{FromDisk, IntoDisk, TransactionLocation};
use super::block::HEIGHT_DISK_BYTES;
/// The fixed length of the scanning result.
///
/// TODO: If the scanning result doesn't have a fixed length, either:
/// - deserialize using internal length or end markers,
/// - prefix it with a length, or
/// - stop storing vectors of results on disk, instead store each result with a unique key.
pub const SAPLING_SCANNING_RESULT_LENGTH: usize = 32;
use super::block::TRANSACTION_LOCATION_DISK_BYTES;
/// The type used in Zebra to store Sapling scanning keys.
/// It can represent a full viewing key or an individual viewing key.
@ -52,7 +43,7 @@ pub struct SaplingScannedDatabaseEntry {
pub index: SaplingScannedDatabaseIndex,
/// The database column family value.
pub value: Vec<SaplingScannedResult>,
pub value: Option<SaplingScannedResult>,
}
/// A database column family key for a block scanned with a Sapling vieweing key.
@ -61,39 +52,62 @@ pub struct SaplingScannedDatabaseIndex {
/// The Sapling viewing key used to scan the block.
pub sapling_key: SaplingScanningKey,
/// The height of the block.
pub height: Height,
/// The transaction location: block height and transaction index.
pub tx_loc: TransactionLocation,
}
impl SaplingScannedDatabaseIndex {
/// The minimum value of a sapling scanned database index.
///
/// This value is guarateed to be the minimum, and not correspond to a valid key.
//
// Note: to calculate the maximum value, we need a key length.
pub const fn min() -> Self {
Self {
// The empty string is the minimum value in RocksDB lexicographic order.
sapling_key: String::new(),
// Genesis is the minimum height, and never has valid shielded transfers.
height: Height(0),
tx_loc: TransactionLocation::MIN,
}
}
/// The minimum value of a sapling scanned database index for `sapling_key`.
/// This value is guarateed to be the minimum, and not correspond to a valid entry.
///
/// This value does not correspond to a valid entry.
/// (The genesis coinbase transaction does not have shielded transfers.)
pub fn min_for_key(sapling_key: &SaplingScanningKey) -> Self {
Self {
sapling_key: sapling_key.clone(),
// Genesis is the minimum height, and never has valid shielded transfers.
height: Height(0),
tx_loc: TransactionLocation::MIN,
}
}
/// The maximum value of a sapling scanned database index for `sapling_key`.
/// This value is guarateed to be the maximum, and not correspond to a valid entry.
///
/// This value may correspond to a valid entry, but it won't be mined for many decades.
pub fn max_for_key(sapling_key: &SaplingScanningKey) -> Self {
Self {
sapling_key: sapling_key.clone(),
// The maximum height will never be mined - we'll increase it before that happens.
height: Height::MAX,
tx_loc: TransactionLocation::MAX,
}
}
/// The minimum value of a sapling scanned database index for `sapling_key` and `height`.
///
/// This value can be a valid entry for shielded coinbase.
pub fn min_for_key_and_height(sapling_key: &SaplingScanningKey, height: Height) -> Self {
Self {
sapling_key: sapling_key.clone(),
tx_loc: TransactionLocation::min_for_height(height),
}
}
/// The maximum value of a sapling scanned database index for `sapling_key` and `height`.
///
/// This value can be a valid entry, but it won't fit in a 2MB block.
pub fn max_for_key_and_height(sapling_key: &SaplingScanningKey, height: Height) -> Self {
Self {
sapling_key: sapling_key.clone(),
tx_loc: TransactionLocation::max_for_height(height),
}
}
}
@ -120,7 +134,7 @@ impl IntoDisk for SaplingScannedDatabaseIndex {
let mut bytes = Vec::new();
bytes.extend(self.sapling_key.as_bytes());
bytes.extend(self.height.as_bytes());
bytes.extend(self.tx_loc.as_bytes());
bytes
}
@ -130,11 +144,11 @@ impl FromDisk for SaplingScannedDatabaseIndex {
fn from_bytes(bytes: impl AsRef<[u8]>) -> Self {
let bytes = bytes.as_ref();
let (sapling_key, height) = bytes.split_at(bytes.len() - HEIGHT_DISK_BYTES);
let (sapling_key, tx_loc) = bytes.split_at(bytes.len() - TRANSACTION_LOCATION_DISK_BYTES);
Self {
sapling_key: SaplingScanningKey::from_bytes(sapling_key),
height: Height::from_bytes(height),
tx_loc: TransactionLocation::from_bytes(tx_loc),
}
}
}
@ -153,22 +167,28 @@ impl FromDisk for SaplingScannedResult {
}
}
impl IntoDisk for Vec<SaplingScannedResult> {
impl IntoDisk for Option<SaplingScannedResult> {
type Bytes = Vec<u8>;
fn as_bytes(&self) -> Self::Bytes {
self.iter()
.flat_map(SaplingScannedResult::as_bytes)
.collect()
let mut bytes = Vec::new();
if let Some(result) = self.as_ref() {
bytes.extend(result.as_bytes());
}
bytes
}
}
impl FromDisk for Vec<SaplingScannedResult> {
impl FromDisk for Option<SaplingScannedResult> {
fn from_bytes(bytes: impl AsRef<[u8]>) -> Self {
bytes
.as_ref()
.chunks(SAPLING_SCANNING_RESULT_LENGTH)
.map(SaplingScannedResult::from_bytes)
.collect()
let bytes = bytes.as_ref();
if bytes.is_empty() {
None
} else {
Some(SaplingScannedResult::from_bytes(bytes))
}
}
}