Clean up transactional API.

This API is still somewhat unsafe in that it doesn't inhibit nested
transactions, but it's better than it was.
This commit is contained in:
Kris Nuttycombe 2020-08-26 17:52:21 -06:00
parent cd2729bbd0
commit 4c2cda48e6
5 changed files with 88 additions and 91 deletions

View File

@ -223,16 +223,17 @@ where
}
// database updates for each block are transactional
data.transactionally(&mut data.get_update_ops()?, |db_update| {
let mut db_update = data.get_update_ops()?;
db_update.transactionally(|up| {
// Insert the block into the database.
db_update.insert_block(height, block_hash, block_time, &tree)?;
up.insert_block(height, block_hash, block_time, &tree)?;
for tx in txs {
let tx_row = db_update.put_tx_meta(&tx, height)?;
let tx_row = up.put_tx_meta(&tx, height)?;
// Mark notes as spent and remove them from the scanning cache
for spend in &tx.shielded_spends {
db_update.mark_spent(tx_row, &spend.nf)?;
up.mark_spent(tx_row, &spend.nf)?;
}
nullifiers.retain(|(nf, _acc)| {
@ -248,7 +249,7 @@ where
output.witness.position() as u64,
);
let note_id = db_update.put_received_note(&output, Some(&nf), tx_row)?;
let note_id = up.put_received_note(&output, Some(&nf), tx_row)?;
// Save witness for note.
witnesses.push((note_id, output.witness));
@ -260,14 +261,14 @@ where
// Insert current witnesses into the database.
for (note_id, witness) in witnesses.iter() {
db_update.insert_witness(*note_id, witness, last_height)?;
up.insert_witness(*note_id, witness, last_height)?;
}
// Prune the stored witnesses (we only expect rollbacks of at most 100 blocks).
db_update.prune_witnesses(last_height - 100)?;
up.prune_witnesses(last_height - 100)?;
// Update now-expired transactions that didn't get mined.
db_update.update_expired_notes(last_height)?;
up.update_expired_notes(last_height)?;
Ok(())
})

View File

@ -70,12 +70,6 @@ pub trait DBOps {
fn get_tx_height(&self, txid: TxId) -> Result<Option<BlockHeight>, Self::Error>;
fn rewind_to_height<P: consensus::Parameters>(
&self,
parameters: &P,
block_height: BlockHeight,
) -> Result<(), Self::Error>;
fn get_address<P: consensus::Parameters>(
&self,
params: &P,
@ -121,8 +115,6 @@ pub trait DBOps {
fn get_nullifiers(&self) -> Result<Vec<(Vec<u8>, AccountId)>, Self::Error>;
fn get_update_ops(&self) -> Result<Self::UpdateOps, Self::Error>;
fn select_spendable_notes(
&self,
account: AccountId,
@ -130,9 +122,7 @@ pub trait DBOps {
anchor_height: BlockHeight,
) -> Result<Vec<SpendableNote>, Self::Error>;
fn transactionally<F, A>(&self, mutator: &mut Self::UpdateOps, f: F) -> Result<A, Self::Error>
where
F: FnOnce(&mut Self::UpdateOps) -> Result<A, Self::Error>;
fn get_update_ops(&self) -> Result<Self::UpdateOps, Self::Error>;
}
pub trait DBUpdate {
@ -140,6 +130,10 @@ pub trait DBUpdate {
type NoteRef: Copy;
type TxRef: Copy;
fn transactionally<F, A>(&mut self, f: F) -> Result<A, Self::Error>
where
F: FnOnce(&mut Self) -> Result<A, Self::Error>;
fn insert_block(
&mut self,
block_height: BlockHeight,
@ -148,6 +142,12 @@ pub trait DBUpdate {
commitment_tree: &CommitmentTree<Node>,
) -> Result<(), Self::Error>;
fn rewind_to_height<P: consensus::Parameters>(
&mut self,
parameters: &P,
block_height: BlockHeight,
) -> Result<(), Self::Error>;
fn put_tx_meta(
&mut self,
tx: &WalletTx,

View File

@ -53,7 +53,7 @@ where
let mut db_update = data.get_update_ops()?;
// Update the database atomically, to ensure the result is internally consistent.
data.transactionally(&mut db_update, |up| {
db_update.transactionally(|up| {
let tx_ref = up.put_tx_data(tx, None)?;
for output in outputs {
@ -235,34 +235,35 @@ where
// Update the database atomically, to ensure the result is internally consistent.
let mut db_update = data.get_update_ops().map_err(|e| e.into())?;
data.transactionally(&mut db_update, |up| {
let created = time::OffsetDateTime::now_utc();
let tx_ref = up.put_tx_data(&tx, Some(created))?;
db_update
.transactionally(|up| {
let created = time::OffsetDateTime::now_utc();
let tx_ref = up.put_tx_data(&tx, Some(created))?;
// Mark notes as spent.
//
// This locks the notes so they aren't selected again by a subsequent call to
// create_spend_to_address() before this transaction has been mined (at which point the notes
// get re-marked as spent).
//
// Assumes that create_spend_to_address() will never be called in parallel, which is a
// reasonable assumption for a light client such as a mobile phone.
for spend in &tx.shielded_spends {
up.mark_spent(tx_ref, &spend.nullifier)?;
}
// Mark notes as spent.
//
// This locks the notes so they aren't selected again by a subsequent call to
// create_spend_to_address() before this transaction has been mined (at which point the notes
// get re-marked as spent).
//
// Assumes that create_spend_to_address() will never be called in parallel, which is a
// reasonable assumption for a light client such as a mobile phone.
for spend in &tx.shielded_spends {
up.mark_spent(tx_ref, &spend.nullifier)?;
}
up.insert_sent_note(
params,
tx_ref,
output_index as usize,
account,
to,
value,
memo,
)?;
up.insert_sent_note(
params,
tx_ref,
output_index as usize,
account,
to,
value,
memo,
)?;
// Return the row number of the transaction, so the caller can fetch it for sending.
Ok(tx_ref)
})
.map_err(|e| e.into())
// Return the row number of the transaction, so the caller can fetch it for sending.
Ok(tx_ref)
})
.map_err(|e| e.into())
}

View File

@ -114,14 +114,6 @@ impl<'a> DBOps for &'a DataConnection {
wallet::get_tx_height(self, txid).map_err(SqliteClientError::from)
}
fn rewind_to_height<P: consensus::Parameters>(
&self,
parameters: &P,
block_height: BlockHeight,
) -> Result<(), Self::Error> {
wallet::rewind_to_height(self, parameters, block_height)
}
fn get_extended_full_viewing_keys<P: consensus::Parameters>(
&self,
params: &P,
@ -193,12 +185,7 @@ impl<'a> DBOps for &'a DataConnection {
target_value: Amount,
anchor_height: BlockHeight,
) -> Result<Vec<SpendableNote>, Self::Error> {
wallet::transact::select_spendable_notes(
self,
account,
target_value,
anchor_height,
)
wallet::transact::select_spendable_notes(self, account, target_value, anchor_height)
}
fn get_update_ops(&self) -> Result<Self::UpdateOps, Self::Error> {
@ -274,33 +261,6 @@ impl<'a> DBOps for &'a DataConnection {
}
)
}
fn transactionally<F, A>(&self, mutator: &mut Self::UpdateOps, f: F) -> Result<A, Self::Error>
where
F: FnOnce(&mut Self::UpdateOps) -> Result<A, Self::Error>,
{
self.0.execute("BEGIN IMMEDIATE", NO_PARAMS)?;
match f(mutator) {
Ok(result) => {
self.0.execute("COMMIT", NO_PARAMS)?;
Ok(result)
}
Err(error) => {
match self.0.execute("ROLLBACK", NO_PARAMS) {
Ok(_) => Err(error),
Err(e) =>
// REVIEW: If rollback fails, what do we want to do? I think that
// panicking here is probably the right thing to do, because it
// means the database is corrupt?
panic!(
"Rollback failed with error {} while attempting to recover from error {}; database is likely corrupt.",
e,
error.0
)
}
}
}
}
}
pub struct DataConnStmtCache<'a> {
@ -333,6 +293,33 @@ impl<'a> DBUpdate for DataConnStmtCache<'a> {
type TxRef = i64;
type NoteRef = NoteId;
fn transactionally<F, A>(&mut self, f: F) -> Result<A, Self::Error>
where
F: FnOnce(&mut Self) -> Result<A, Self::Error>,
{
self.conn.0.execute("BEGIN IMMEDIATE", NO_PARAMS)?;
match f(self) {
Ok(result) => {
self.conn.0.execute("COMMIT", NO_PARAMS)?;
Ok(result)
}
Err(error) => {
match self.conn.0.execute("ROLLBACK", NO_PARAMS) {
Ok(_) => Err(error),
Err(e) =>
// REVIEW: If rollback fails, what do we want to do? I think that
// panicking here is probably the right thing to do, because it
// means the database is corrupt?
panic!(
"Rollback failed with error {} while attempting to recover from error {}; database is likely corrupt.",
e,
error.0
)
}
}
}
}
fn insert_block(
&mut self,
block_height: BlockHeight,
@ -356,6 +343,14 @@ impl<'a> DBUpdate for DataConnStmtCache<'a> {
Ok(())
}
fn rewind_to_height<P: consensus::Parameters>(
&mut self,
parameters: &P,
block_height: BlockHeight,
) -> Result<(), Self::Error> {
wallet::rewind_to_height(self.conn, parameters, block_height)
}
fn put_tx_meta(
&mut self,
tx: &WalletTx,
@ -530,7 +525,7 @@ impl<'a> DBUpdate for DataConnStmtCache<'a> {
AccountId(output.account as u32),
&RecipientAddress::Shielded(output.to.clone()),
Amount::from_u64(output.note.value)
.map_err(|_| Error::CorruptedData("Note value invalue."))?,
.map_err(|_| Error::CorruptedData("Note value invalid."))?,
Some(output.memo.clone()),
)?
}

View File

@ -5,14 +5,14 @@ use std::convert::TryInto;
use ff::PrimeField;
use zcash_primitives::{
consensus::{BlockHeight},
consensus::BlockHeight,
merkle_tree::IncrementalWitness,
primitives::{Diversifier, Rseed},
transaction::components::Amount,
};
use zcash_client_backend::{
data_api::{error::Error},
data_api::error::Error,
wallet::{AccountId, SpendableNote},
};