Abstract over data access in scan_cached_blocks.

This commit is contained in:
Kris Nuttycombe 2020-08-20 11:41:43 -06:00
parent 06c1772692
commit d16c124ffe
5 changed files with 384 additions and 248 deletions

View File

@ -25,18 +25,17 @@ pub const ANCHOR_OFFSET: u32 = 10;
/// - `Err(e)` if there was an error during validation unrelated to chain validity.
///
/// This function does not mutate either of the databases.
pub fn validate_combined_chain<
E0,
N,
pub fn validate_combined_chain<'db, E0, N, E, P, C, D>(
parameters: &P,
cache: &C,
data: &'db D,
) -> Result<(), E>
where
E: From<Error<E0, N>>,
P: consensus::Parameters,
C: CacheOps<Error = E>,
D: DBOps<Error = E>,
>(
parameters: &P,
cache: &C,
data: &D,
) -> Result<(), E> {
&'db D: DBOps<Error = E>,
{
let sapling_activation_height = parameters
.activation_height(NetworkUpgrade::Sapling)
.ok_or(Error::SaplingNotActive)?;
@ -85,9 +84,13 @@ pub fn validate_combined_chain<
/// Determines the target height for a transaction, and the height from which to
/// select anchors, based on the current synchronised block chain.
pub fn get_target_and_anchor_heights<E0, N, E: From<Error<E0, N>>, D: DBOps<Error = E>>(
data: &D,
) -> Result<(BlockHeight, BlockHeight), E> {
pub fn get_target_and_anchor_heights<'db, E0, N, E, D>(
data: &'db D,
) -> Result<(BlockHeight, BlockHeight), E>
where
E: From<Error<E0, N>>,
&'db D: DBOps<Error = E>,
{
data.block_height_extrema().and_then(|heights| {
match heights {
Some((min_height, max_height)) => {

View File

@ -8,17 +8,18 @@ use zcash_primitives::{
zip32::ExtendedFullViewingKey,
};
use crate::proto::compact_formats::CompactBlock;
use crate::{
proto::compact_formats::CompactBlock,
wallet::{AccountId, WalletShieldedOutput, WalletTx},
};
pub mod chain;
pub mod error;
pub trait DBOps {
type Error;
type AccountId;
type NoteId;
// type TxRef; // Backend-specific transaction handle
// type NoteRef; // Backend-specific note identifier`
type NoteRef: Copy; // Backend-specific note identifier
type Mutator: DBUpdate<Error = Self::Error, NoteRef = Self::NoteRef>;
fn init_db(&self) -> Result<(), Self::Error>;
@ -49,19 +50,19 @@ pub trait DBOps {
fn get_address<P: consensus::Parameters>(
&self,
params: &P,
account: Self::AccountId,
account: AccountId,
) -> Result<Option<PaymentAddress>, Self::Error>;
fn get_balance(&self, account: Self::AccountId) -> Result<Amount, Self::Error>;
fn get_balance(&self, account: AccountId) -> Result<Amount, Self::Error>;
fn get_verified_balance(&self, account: Self::AccountId) -> Result<Amount, Self::Error>;
fn get_verified_balance(&self, account: AccountId) -> Result<Amount, Self::Error>;
fn get_received_memo_as_utf8(
&self,
id_note: Self::NoteId,
id_note: Self::NoteRef,
) -> Result<Option<String>, Self::Error>;
fn get_sent_memo_as_utf8(&self, id_note: Self::NoteId) -> Result<Option<String>, Self::Error>;
fn get_sent_memo_as_utf8(&self, id_note: Self::NoteRef) -> Result<Option<String>, Self::Error>;
fn get_extended_full_viewing_keys<P: consensus::Parameters>(
&self,
@ -76,35 +77,57 @@ pub trait DBOps {
fn get_witnesses(
&self,
block_height: BlockHeight,
) -> Result<Vec<(Self::NoteId, IncrementalWitness<Node>)>, Self::Error>;
) -> Result<Vec<(Self::NoteRef, IncrementalWitness<Node>)>, Self::Error>;
fn get_nullifiers(&self) -> Result<Vec<(Vec<u8>, Self::AccountId)>, Self::Error>;
fn get_nullifiers(&self) -> Result<Vec<(Vec<u8>, AccountId)>, Self::Error>;
fn get_mutator(&self) -> Result<Self::Mutator, Self::Error>;
fn transactionally<F>(&self, mutator: &mut Self::Mutator, f: F) -> Result<(), Self::Error>
where
F: FnOnce(&mut Self::Mutator) -> Result<(), Self::Error>;
// fn get_witnesses(block_height: BlockHeight) -> Result<Box<dyn Iterator<Item = IncrementalWitness<Node>>>, Self::Error>;
//
// fn get_nullifiers() -> Result<(Vec<u8>, Account), Self::Error>;
//
// fn create_block(block_height: BlockHeight, hash: BlockHash, time: u32, sapling_tree: CommitmentTree<Node>) -> Result<(), Self::Error>;
//
// fn put_transaction(transaction: Transaction, block_height: BlockHeight) -> Result<Self::TxRef, Self::Error>;
//
// fn get_txref(txid: TxId) -> Result<Option<Self::TxRef>, Self::Error>;
//
// fn mark_spent(tx_ref: Self::TxRef, nullifier: Vec<u8>) -> Result<(), Self::Error>;
//
// fn put_note(output: WalletShieldedOutput, tx_ref: Self::TxRef, nullifier: Vec<u8>) -> Result<(), Self::Error>;
//
// fn get_note(tx_ref: Self::TxRef, output_index: i64) -> Result<Self::NoteRef, Self::Error>;
//
// fn prune_witnesses(to_height: BlockHeight) -> Result<(), Self::Error>;
//
// fn mark_expired_unspent(to_height: BlockHeight) -> Result<(), Self::Error>;
//
// fn put_sent_note(tx_ref: Self::TxRef, output: DecryptedOutput) -> Result<(), Self::Error>;
//
// fn put_received_note(tx_ref: Self::TxRef, output: DecryptedOutput) -> Result<(), Self::Error>;
}
pub trait DBUpdate {
type Error;
type TxRef: Copy;
type NoteRef: Copy;
fn insert_block(
&mut self,
block_height: BlockHeight,
block_hash: BlockHash,
block_time: u32,
commitment_tree: &CommitmentTree<Node>,
) -> Result<(), Self::Error>;
fn put_tx(&mut self, tx: &WalletTx, height: BlockHeight) -> Result<Self::TxRef, Self::Error>;
fn mark_spent(&mut self, tx_ref: Self::TxRef, nf: &Vec<u8>) -> Result<(), Self::Error>;
fn put_note(
&mut self,
output: &WalletShieldedOutput,
nf: &Vec<u8>,
tx_ref: Self::TxRef,
) -> Result<Self::NoteRef, Self::Error>;
fn insert_witness(
&mut self,
note_id: Self::NoteRef,
witness: &IncrementalWitness<Node>,
height: BlockHeight,
) -> Result<(), Self::Error>;
fn prune_witnesses(&mut self, from_height: BlockHeight) -> Result<(), Self::Error>;
fn update_expired_notes(&mut self, from_height: BlockHeight) -> Result<(), Self::Error>;
}
pub trait CacheOps {
type Error;

View File

@ -8,6 +8,11 @@ use zcash_primitives::{
transaction::TxId,
};
/// A type-safe wrapper for account identifiers.
#[derive(Debug, Copy, Clone)]
pub struct AccountId(pub u32);
/// A subset of a [`Transaction`] relevant to wallets and light clients.
///
/// [`Transaction`]: zcash_primitives::transaction::Transaction

View File

@ -27,7 +27,9 @@
use std::fmt;
use std::path::Path;
use rusqlite::Connection;
use rusqlite::{types::ToSql, Connection, Statement, NO_PARAMS};
use ff::PrimeField;
use zcash_primitives::{
block::BlockHash,
@ -40,9 +42,10 @@ use zcash_primitives::{
};
use zcash_client_backend::{
data_api::{CacheOps, DBOps},
data_api::{CacheOps, DBOps, DBUpdate},
encoding::encode_payment_address,
proto::compact_formats::CompactBlock,
wallet::{AccountId, WalletShieldedOutput, WalletTx},
};
use crate::error::SqliteClientError;
@ -54,9 +57,6 @@ pub mod query;
pub mod scan;
pub mod transact;
#[derive(Debug, Copy, Clone)]
pub struct AccountId(pub u32);
#[derive(Debug, Copy, Clone)]
pub struct NoteId(pub i64);
@ -74,10 +74,10 @@ impl DataConnection {
}
}
impl DBOps for DataConnection {
impl<'a> DBOps for &'a DataConnection {
type Error = SqliteClientError;
type AccountId = AccountId;
type NoteId = NoteId;
type NoteRef = NoteId;
type Mutator = DBMutator<'a>;
fn init_db(&self) -> Result<(), Self::Error> {
init::init_data_database(self).map_err(SqliteClientError::from)
@ -120,27 +120,27 @@ impl DBOps for DataConnection {
fn get_address<P: consensus::Parameters>(
&self,
params: &P,
account: Self::AccountId,
account: AccountId,
) -> Result<Option<PaymentAddress>, Self::Error> {
query::get_address(self, params, account)
}
fn get_balance(&self, account: Self::AccountId) -> Result<Amount, Self::Error> {
fn get_balance(&self, account: AccountId) -> Result<Amount, Self::Error> {
query::get_balance(self, account)
}
fn get_verified_balance(&self, account: Self::AccountId) -> Result<Amount, Self::Error> {
fn get_verified_balance(&self, account: AccountId) -> Result<Amount, Self::Error> {
query::get_verified_balance(self, account)
}
fn get_received_memo_as_utf8(
&self,
id_note: Self::NoteId,
id_note: Self::NoteRef,
) -> Result<Option<String>, Self::Error> {
query::get_received_memo_as_utf8(self, id_note)
}
fn get_sent_memo_as_utf8(&self, id_note: Self::NoteId) -> Result<Option<String>, Self::Error> {
fn get_sent_memo_as_utf8(&self, id_note: Self::NoteRef) -> Result<Option<String>, Self::Error> {
query::get_sent_memo_as_utf8(self, id_note)
}
@ -161,13 +161,241 @@ impl DBOps for DataConnection {
fn get_witnesses(
&self,
block_height: BlockHeight,
) -> Result<Vec<(Self::NoteId, IncrementalWitness<Node>)>, Self::Error> {
) -> Result<Vec<(Self::NoteRef, IncrementalWitness<Node>)>, Self::Error> {
query::get_witnesses(self, block_height)
}
fn get_nullifiers(&self) -> Result<Vec<(Vec<u8>, Self::AccountId)>, Self::Error> {
fn get_nullifiers(&self) -> Result<Vec<(Vec<u8>, AccountId)>, Self::Error> {
query::get_nullifiers(self)
}
fn get_mutator(&self) -> Result<Self::Mutator, Self::Error> {
Ok(
DBMutator {
conn: self,
stmt_insert_block: self.0.prepare(
"INSERT INTO blocks (height, hash, time, sapling_tree)
VALUES (?, ?, ?, ?)",
)?,
stmt_insert_tx: self.0.prepare(
"INSERT INTO transactions (txid, block, tx_index)
VALUES (?, ?, ?)",
)?,
stmt_update_tx: self.0.prepare(
"UPDATE transactions
SET block = ?, tx_index = ? WHERE txid = ?",
)?,
stmt_select_tx: self.0.prepare(
"SELECT id_tx FROM transactions WHERE txid = ?",
)?,
stmt_mark_spent_note: self.0.prepare(
"UPDATE received_notes SET spent = ? WHERE nf = ?"
)?,
stmt_update_note: self.0.prepare(
"UPDATE received_notes
SET account = ?, diversifier = ?, value = ?, rcm = ?, nf = ?, is_change = ?
WHERE tx = ? AND output_index = ?",
)?,
stmt_insert_note: self.0.prepare(
"INSERT INTO received_notes (tx, output_index, account, diversifier, value, rcm, nf, is_change)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
)?,
stmt_select_note: self.0.prepare(
"SELECT id_note FROM received_notes WHERE tx = ? AND output_index = ?"
)?,
stmt_insert_witness: self.0.prepare(
"INSERT INTO sapling_witnesses (note, block, witness)
VALUES (?, ?, ?)",
)?,
stmt_prune_witnesses: self.0.prepare(
"DELETE FROM sapling_witnesses WHERE block < ?"
)?,
stmt_update_expired: self.0.prepare(
"UPDATE received_notes SET spent = NULL WHERE EXISTS (
SELECT id_tx FROM transactions
WHERE id_tx = received_notes.spent AND block IS NULL AND expiry_height < ?
)",
)?,
}
)
}
fn transactionally<F>(&self, mutator: &mut Self::Mutator, f: F) -> Result<(), Self::Error>
where
F: FnOnce(&mut Self::Mutator) -> Result<(), Self::Error>,
{
self.0.execute("BEGIN IMMEDIATE", NO_PARAMS)?;
match f(mutator) {
Ok(_) => {
self.0.execute("COMMIT", NO_PARAMS)?;
Ok(())
}
Err(error) => {
match self.0.execute("ROLLBACK", NO_PARAMS) {
Ok(_) => Err(error),
// REVIEW: If rollback fails, what do we want to do? I think that
// panicking here is probably the right thing to do, because it
// means the database is corrupt?
Err(e) => panic!(
"Rollback failed with error {} while attempting to recover from error {}; database is likely corrupt.",
e,
error.0
)
}
}
}
}
}
pub struct DBMutator<'a> {
conn: &'a DataConnection,
stmt_insert_block: Statement<'a>,
stmt_insert_tx: Statement<'a>,
stmt_update_tx: Statement<'a>,
stmt_select_tx: Statement<'a>,
stmt_mark_spent_note: Statement<'a>,
stmt_insert_note: Statement<'a>,
stmt_update_note: Statement<'a>,
stmt_select_note: Statement<'a>,
stmt_insert_witness: Statement<'a>,
stmt_prune_witnesses: Statement<'a>,
stmt_update_expired: Statement<'a>,
}
impl<'a> DBUpdate for DBMutator<'a> {
type Error = SqliteClientError;
type TxRef = i64;
type NoteRef = NoteId;
fn insert_block(
&mut self,
block_height: BlockHeight,
block_hash: BlockHash,
block_time: u32,
commitment_tree: &CommitmentTree<Node>,
) -> Result<(), Self::Error> {
let mut encoded_tree = Vec::new();
commitment_tree
.write(&mut encoded_tree)
.expect("Should be able to write to a Vec");
self.stmt_insert_block.execute(&[
u32::from(block_height).to_sql()?,
block_hash.0.to_sql()?,
block_time.to_sql()?,
encoded_tree.to_sql()?,
])?;
Ok(())
}
fn put_tx(&mut self, tx: &WalletTx, height: BlockHeight) -> Result<Self::TxRef, Self::Error> {
let txid = tx.txid.0.to_vec();
if self.stmt_update_tx.execute(&[
u32::from(height).to_sql()?,
(tx.index as i64).to_sql()?,
txid.to_sql()?,
])? == 0
{
// It isn't there, so insert our transaction into the database.
self.stmt_insert_tx.execute(&[
txid.to_sql()?,
u32::from(height).to_sql()?,
(tx.index as i64).to_sql()?,
])?;
Ok(self.conn.0.last_insert_rowid())
} else {
// It was there, so grab its row number.
self.stmt_select_tx
.query_row(&[txid], |row| row.get(0))
.map_err(SqliteClientError::from)
}
}
fn mark_spent(&mut self, tx_ref: Self::TxRef, nf: &Vec<u8>) -> Result<(), Self::Error> {
self.stmt_mark_spent_note
.execute(&[tx_ref.to_sql()?, nf.to_sql()?])?;
Ok(())
}
fn put_note(
&mut self,
output: &WalletShieldedOutput,
nf: &Vec<u8>,
tx_ref: Self::TxRef,
) -> Result<Self::NoteRef, Self::Error> {
let rcm = output.note.rcm().to_repr();
// Assumptions:
// - A transaction will not contain more than 2^63 shielded outputs.
// - A note value will never exceed 2^63 zatoshis.
// First try updating an existing received note into the database.
if self.stmt_update_note.execute(&[
(output.account as i64).to_sql()?,
output.to.diversifier().0.to_sql()?,
(output.note.value as i64).to_sql()?,
rcm.as_ref().to_sql()?,
nf.to_sql()?,
output.is_change.to_sql()?,
tx_ref.to_sql()?,
(output.index as i64).to_sql()?,
])? == 0
{
// It isn't there, so insert our note into the database.
self.stmt_insert_note.execute(&[
tx_ref.to_sql()?,
(output.index as i64).to_sql()?,
(output.account as i64).to_sql()?,
output.to.diversifier().0.to_sql()?,
(output.note.value as i64).to_sql()?,
rcm.as_ref().to_sql()?,
nf.to_sql()?,
output.is_change.to_sql()?,
])?;
Ok(NoteId(self.conn.0.last_insert_rowid()))
} else {
// It was there, so grab its row number.
self.stmt_select_note
.query_row(
&[tx_ref.to_sql()?, (output.index as i64).to_sql()?],
|row| row.get(0).map(NoteId),
)
.map_err(SqliteClientError::from)
}
}
fn insert_witness(
&mut self,
note_id: Self::NoteRef,
witness: &IncrementalWitness<Node>,
height: BlockHeight,
) -> Result<(), Self::Error> {
let mut encoded = Vec::new();
witness
.write(&mut encoded)
.expect("Should be able to write to a Vec");
self.stmt_insert_witness.execute(&[
note_id.0.to_sql()?,
u32::from(height).to_sql()?,
encoded.to_sql()?,
])?;
Ok(())
}
fn prune_witnesses(&mut self, below_height: BlockHeight) -> Result<(), Self::Error> {
self.stmt_prune_witnesses
.execute(&[u32::from(below_height)])?;
Ok(())
}
fn update_expired_notes(&mut self, height: BlockHeight) -> Result<(), Self::Error> {
self.stmt_update_expired.execute(&[u32::from(height)])?;
Ok(())
}
}
pub struct CacheConnection(Connection);

View File

@ -5,38 +5,33 @@ use protobuf::parse_from_bytes;
use rusqlite::{types::ToSql, OptionalExtension, NO_PARAMS};
use zcash_primitives::{
block::BlockHash,
consensus::{self, BlockHeight, NetworkUpgrade},
merkle_tree::CommitmentTree,
transaction::Transaction,
};
use zcash_client_backend::{
address::RecipientAddress,
data_api::{
error::{ChainInvalid, Error},
CacheOps, DBOps,
CacheOps, DBOps, DBUpdate,
},
decrypt_transaction,
encoding::decode_extended_full_viewing_key,
proto::compact_formats::CompactBlock,
wallet::WalletTx,
welding_rig::scan_block,
};
use zcash_primitives::{
consensus::{self, BlockHeight, NetworkUpgrade},
merkle_tree::{CommitmentTree, IncrementalWitness},
sapling::Node,
transaction::Transaction,
};
use crate::{error::SqliteClientError, AccountId, CacheConnection, DataConnection, NoteId};
use crate::{error::SqliteClientError, AccountId, CacheConnection, DataConnection};
struct CompactBlockRow {
height: BlockHeight,
data: Vec<u8>,
}
#[derive(Clone)]
struct WitnessRow {
id_note: i64,
witness: IncrementalWitness<Node>,
}
/// Scans at most `limit` new blocks added to the cache for any transactions received by
/// the tracked accounts.
///
@ -79,12 +74,19 @@ struct WitnessRow {
/// ```
///
/// [`init_blocks_table`]: crate::init::init_blocks_table
pub fn scan_cached_blocks<P: consensus::Parameters>(
pub fn scan_cached_blocks<'db, E, E0, N, P, C, D>(
params: &P,
cache: &CacheConnection,
data: &DataConnection,
cache: &C,
data: &'db D,
limit: Option<u32>,
) -> Result<(), SqliteClientError> {
) -> Result<(), E>
where
P: consensus::Parameters,
C: CacheOps<Error = E>,
&'db D: DBOps<Error = E, NoteRef = N>,
N: Copy,
E: From<Error<E0, N>>,
{
let sapling_activation_height = params
.activation_height(NetworkUpgrade::Sapling)
.ok_or(Error::SaplingNotActive)?;
@ -110,71 +112,20 @@ pub fn scan_cached_blocks<P: consensus::Parameters>(
// Get the nullifiers for the notes we are tracking
let mut nullifiers = data.get_nullifiers()?;
// Prepare per-block SQL statements
let mut stmt_insert_block = data.0.prepare(
"INSERT INTO blocks (height, hash, time, sapling_tree)
VALUES (?, ?, ?, ?)",
)?;
let mut stmt_update_tx = data.0.prepare(
"UPDATE transactions
SET block = ?, tx_index = ? WHERE txid = ?",
)?;
let mut stmt_insert_tx = data.0.prepare(
"INSERT INTO transactions (txid, block, tx_index)
VALUES (?, ?, ?)",
)?;
let mut stmt_select_tx = data
.0
.prepare("SELECT id_tx FROM transactions WHERE txid = ?")?;
let mut stmt_mark_spent_note = data
.0
.prepare("UPDATE received_notes SET spent = ? WHERE nf = ?")?;
let mut stmt_update_note = data.0.prepare(
"UPDATE received_notes
SET account = ?, diversifier = ?, value = ?, rcm = ?, nf = ?, is_change = ?
WHERE tx = ? AND output_index = ?",
)?;
let mut stmt_insert_note = data.0.prepare(
"INSERT INTO received_notes (tx, output_index, account, diversifier, value, rcm, nf, is_change)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
)?;
let mut stmt_select_note = data
.0
.prepare("SELECT id_note FROM received_notes WHERE tx = ? AND output_index = ?")?;
let mut stmt_insert_witness = data.0.prepare(
"INSERT INTO sapling_witnesses (note, block, witness)
VALUES (?, ?, ?)",
)?;
let mut stmt_prune_witnesses = data
.0
.prepare("DELETE FROM sapling_witnesses WHERE block < ?")?;
let mut stmt_update_expired = data.0.prepare(
"UPDATE received_notes SET spent = NULL WHERE EXISTS (
SELECT id_tx FROM transactions
WHERE id_tx = received_notes.spent AND block IS NULL AND expiry_height < ?
)",
)?;
cache.with_cached_blocks(
last_height,
limit,
|height: BlockHeight, block: CompactBlock| {
// Start an SQL transaction for this block.
data.0.execute("BEGIN IMMEDIATE", NO_PARAMS)?;
// Scanned blocks MUST be height-sequential.
if height != (last_height + 1) {
return Err(SqliteClientError(ChainInvalid::block_height_mismatch(
last_height + 1,
height,
)));
return Err(ChainInvalid::block_height_mismatch(last_height + 1, height).into());
}
last_height = height;
let block_hash = block.hash.clone();
let block_hash = BlockHash::from_slice(&block.hash);
let block_time = block.time;
let txs = {
let txs: Vec<WalletTx> = {
let nf_refs: Vec<_> = nullifiers
.iter()
.map(|(nf, acc)| (&nf[..], acc.0 as usize))
@ -196,10 +147,7 @@ pub fn scan_cached_blocks<P: consensus::Parameters>(
let cur_root = tree.root();
for row in &witnesses {
if row.1.root() != cur_root {
return Err(SqliteClientError(Error::InvalidWitnessAnchor(
row.0,
last_height,
)));
return Err(Error::InvalidWitnessAnchor(row.0, last_height).into());
}
}
for tx in &txs {
@ -217,126 +165,55 @@ pub fn scan_cached_blocks<P: consensus::Parameters>(
}
}
// Insert the block into the database.
let mut encoded_tree = Vec::new();
tree.write(&mut encoded_tree)
.expect("Should be able to write to a Vec");
stmt_insert_block.execute(&[
u32::from(height).to_sql()?,
block_hash.to_sql()?,
block_time.to_sql()?,
encoded_tree.to_sql()?,
])?;
// database updates for each block are transactional
data.transactionally(&mut data.get_mutator()?, |mutator| {
// Insert the block into the database.
mutator.insert_block(height, block_hash, block_time, &tree)?;
for tx in txs {
// First try update an existing transaction in the database.
let txid = tx.txid.0.to_vec();
let tx_row = if stmt_update_tx.execute(&[
u32::from(height).to_sql()?,
(tx.index as i64).to_sql()?,
txid.to_sql()?,
])? == 0
{
// It isn't there, so insert our transaction into the database.
stmt_insert_tx.execute(&[
txid.to_sql()?,
u32::from(height).to_sql()?,
(tx.index as i64).to_sql()?,
])?;
data.0.last_insert_rowid()
} else {
// It was there, so grab its row number.
stmt_select_tx.query_row(&[txid], |row| row.get(0))?
};
for tx in txs {
let tx_row = mutator.put_tx(&tx, height)?;
// Mark notes as spent and remove them from the scanning cache
for spend in &tx.shielded_spends {
stmt_mark_spent_note.execute(&[tx_row.to_sql()?, spend.nf.to_sql()?])?;
// Mark notes as spent and remove them from the scanning cache
for spend in &tx.shielded_spends {
mutator.mark_spent(tx_row, &spend.nf)?;
}
nullifiers.retain(|(nf, _acc)| {
tx.shielded_spends
.iter()
.find(|spend| &spend.nf == nf)
.is_none()
});
for output in tx.shielded_outputs {
let nf = output.note.nf(
&extfvks[output.account].fvk.vk,
output.witness.position() as u64,
);
let note_id = mutator.put_note(&output, &nf, tx_row)?;
// Save witness for note.
witnesses.push((note_id, output.witness));
// Cache nullifier for note (to detect subsequent spends in this scan).
nullifiers.push((nf, AccountId(output.account as u32)));
}
}
nullifiers.retain(|(nf, _acc)| {
tx.shielded_spends
.iter()
.find(|spend| &spend.nf == nf)
.is_none()
});
for output in tx.shielded_outputs {
let rcm = output.note.rcm().to_repr();
let nf = output.note.nf(
&extfvks[output.account].fvk.vk,
output.witness.position() as u64,
);
// Assumptions:
// - A transaction will not contain more than 2^63 shielded outputs.
// - A note value will never exceed 2^63 zatoshis.
// First try updating an existing received note into the database.
let note_id = if stmt_update_note.execute(&[
(output.account as i64).to_sql()?,
output.to.diversifier().0.to_sql()?,
(output.note.value as i64).to_sql()?,
rcm.as_ref().to_sql()?,
nf.to_sql()?,
output.is_change.to_sql()?,
tx_row.to_sql()?,
(output.index as i64).to_sql()?,
])? == 0
{
// It isn't there, so insert our note into the database.
stmt_insert_note.execute(&[
tx_row.to_sql()?,
(output.index as i64).to_sql()?,
(output.account as i64).to_sql()?,
output.to.diversifier().0.to_sql()?,
(output.note.value as i64).to_sql()?,
rcm.as_ref().to_sql()?,
nf.to_sql()?,
output.is_change.to_sql()?,
])?;
NoteId(data.0.last_insert_rowid())
} else {
// It was there, so grab its row number.
stmt_select_note.query_row(
&[tx_row.to_sql()?, (output.index as i64).to_sql()?],
|row| row.get(0).map(NoteId),
)?
};
// Save witness for note.
witnesses.push((note_id, output.witness));
// Cache nullifier for note (to detect subsequent spends in this scan).
nullifiers.push((nf, AccountId(output.account as u32)));
// Insert current witnesses into the database.
for (note_id, witness) in witnesses.iter() {
mutator.insert_witness(*note_id, witness, last_height)?;
}
}
// Insert current witnesses into the database.
let mut encoded = Vec::new();
for witness_row in witnesses.iter() {
encoded.clear();
witness_row
.1
.write(&mut encoded)
.expect("Should be able to write to a Vec");
stmt_insert_witness.execute(&[
(witness_row.0).0.to_sql()?,
u32::from(last_height).to_sql()?,
encoded.to_sql()?,
])?;
}
// Prune the stored witnesses (we only expect rollbacks of at most 100 blocks).
mutator.prune_witnesses(last_height - 100)?;
// Prune the stored witnesses (we only expect rollbacks of at most 100 blocks).
stmt_prune_witnesses.execute(&[u32::from(last_height - 100)])?;
// Update now-expired transactions that didn't get mined.
mutator.update_expired_notes(last_height)?;
// Update now-expired transactions that didn't get mined.
stmt_update_expired.execute(&[u32::from(last_height)])?;
// Commit the SQL transaction, writing this block's data atomically.
data.0.execute("COMMIT", NO_PARAMS)?;
Ok(())
Ok(())
})
},
)?;