From 70ab12b9d18c18d3d30d3e81fcdd031144a07303 Mon Sep 17 00:00:00 2001 From: Hanh Date: Fri, 3 Sep 2021 10:56:10 +0800 Subject: [PATCH] Database tweaks --- src/db.rs | 56 +++++++------ src/scan.rs | 199 +++++++++++++++++++++++---------------------- src/taddr.rs | 2 - src/transaction.rs | 3 +- src/wallet.rs | 9 +- 5 files changed, 138 insertions(+), 131 deletions(-) diff --git a/src/db.rs b/src/db.rs index 326a214..4edfabc 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,10 +1,10 @@ use crate::chain::{Nf, NfRef}; use crate::prices::Quote; -use crate::taddr::{derive_tkeys, BIP44_PATH}; +use crate::taddr::derive_tkeys; use crate::transaction::{Contact, TransactionInfo}; use crate::{CTree, Witness, NETWORK}; use chrono::NaiveDateTime; -use rusqlite::{params, Connection, OptionalExtension, NO_PARAMS}; +use rusqlite::{params, Connection, OptionalExtension, NO_PARAMS, Transaction}; use std::collections::HashMap; use zcash_client_backend::encoding::decode_extended_full_viewing_key; use zcash_primitives::consensus::{NetworkUpgrade, Parameters}; @@ -62,16 +62,16 @@ impl DbAdapter { Ok(DbAdapter { connection }) } - pub fn begin_transaction(&self) -> anyhow::Result<()> { - self.connection.execute("BEGIN TRANSACTION", NO_PARAMS)?; - Ok(()) + pub fn begin_transaction(&mut self) -> anyhow::Result { + let tx = self.connection.transaction()?; + Ok(tx) } - - pub fn commit(&self) -> anyhow::Result<()> { - self.connection.execute("COMMIT", NO_PARAMS)?; - Ok(()) - } - + // + // pub fn commit(&self) -> anyhow::Result<()> { + // self.connection.execute("COMMIT", NO_PARAMS)?; + // Ok(()) + // } + // pub fn init_db(&self) -> anyhow::Result<()> { migration::init_db(&self.connection)?; Ok(()) @@ -186,21 +186,21 @@ impl DbAdapter { } pub fn store_transaction( - &self, txid: &[u8], account: u32, height: u32, timestamp: u32, tx_index: u32, + db_tx: &Transaction ) -> anyhow::Result { log::debug!("+transaction"); - self.connection.execute( + db_tx.execute( "INSERT INTO transactions(account, txid, height, timestamp, tx_index, value) VALUES (?1, ?2, ?3, ?4, ?5, 0) ON CONFLICT DO NOTHING", params![account, txid, height, timestamp, tx_index], )?; - let id_tx: u32 = self.connection.query_row( + let id_tx: u32 = db_tx.query_row( "SELECT id_tx FROM transactions WHERE account = ?1 AND txid = ?2", params![account, txid], |row| row.get(0), @@ -210,16 +210,16 @@ impl DbAdapter { } pub fn store_received_note( - &self, note: &ReceivedNote, id_tx: u32, position: usize, + db_tx: &Transaction ) -> anyhow::Result { log::debug!("+received_note {}", id_tx); - self.connection.execute("INSERT INTO received_notes(account, tx, height, position, output_index, diversifier, value, rcm, nf, spent) + db_tx.execute("INSERT INTO received_notes(account, tx, height, position, output_index, diversifier, value, rcm, nf, spent) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10) ON CONFLICT DO NOTHING", params![note.account, id_tx, note.height, position as u32, note.output_index, note.diversifier, note.value as i64, note.rcm, note.nf, note.spent])?; - let id_note: u32 = self.connection.query_row( + let id_note: u32 = db_tx.query_row( "SELECT id_note FROM received_notes WHERE tx = ?1 AND output_index = ?2", params![id_tx, note.output_index], |row| row.get(0), @@ -254,16 +254,16 @@ impl DbAdapter { Ok(()) } - pub fn add_value(&self, id_tx: u32, value: i64) -> anyhow::Result<()> { - self.connection.execute( + pub fn add_value(id_tx: u32, value: i64, db_tx: &Transaction) -> anyhow::Result<()> { + db_tx.execute( "UPDATE transactions SET value = value + ?2 WHERE id_tx = ?1", params![id_tx, value], )?; Ok(()) } - pub fn get_received_note_value(&self, nf: &Nf) -> anyhow::Result<(u32, i64)> { - let (account, value) = self.connection.query_row( + pub fn get_received_note_value(nf: &Nf, db_tx: &Transaction) -> anyhow::Result<(u32, i64)> { + let (account, value) = db_tx.query_row( "SELECT account, value FROM received_notes WHERE nf = ?1", params![nf.0.to_vec()], |row| { @@ -455,9 +455,9 @@ impl DbAdapter { Ok(spendable_notes) } - pub fn mark_spent(&self, id: u32, height: u32) -> anyhow::Result<()> { + pub fn mark_spent(id: u32, height: u32, tx: &Transaction) -> anyhow::Result<()> { log::debug!("+mark_spent"); - self.connection.execute( + tx.execute( "UPDATE received_notes SET spent = ?1 WHERE id_note = ?2", params![height, id], )?; @@ -632,7 +632,8 @@ impl DbAdapter { pub fn create_taddr(&self, account: u32) -> anyhow::Result<()> { let seed = self.get_seed(account)?; if let Some(seed) = seed { - let (sk, address) = derive_tkeys(&seed, BIP44_PATH)?; + let bip44_path = format!("m/44'/{}'/0'/0/0", NETWORK.coin_type()); + let (sk, address) = derive_tkeys(&seed, &bip44_path)?; self.connection.execute( "INSERT INTO taddrs(account, sk, address) VALUES (?1, ?2, ?3) \ ON CONFLICT DO NOTHING", @@ -704,8 +705,9 @@ mod tests { db.trim_to_height(0).unwrap(); db.store_block(1, &[0u8; 32], 0, &CTree::new()).unwrap(); - let id_tx = db.store_transaction(&[0; 32], 1, 1, 0, 20).unwrap(); - db.store_received_note( + let db_tx = db.begin_transaction().unwrap(); + let id_tx = DbAdapter::store_transaction(&[0; 32], 1, 1, 0, 20, &db_tx).unwrap(); + DbAdapter::store_received_note( &ReceivedNote { account: 1, height: 1, @@ -718,6 +720,7 @@ mod tests { }, id_tx, 5, + &db_tx ) .unwrap(); let witness = Witness { @@ -728,6 +731,7 @@ mod tests { filled: vec![], cursor: CTree::new(), }; + db_tx.commit().unwrap(); db.store_witnesses(&witness, 1000, 1).unwrap(); } diff --git a/src/scan.rs b/src/scan.rs index fdd7a8c..ad4d944 100644 --- a/src/scan.rs +++ b/src/scan.rs @@ -135,7 +135,7 @@ pub async fn sync_async( let proc_callback = progress_callback.clone(); let processor = tokio::spawn(async move { - let db = DbAdapter::new(&db_path)?; + let mut db = DbAdapter::new(&db_path)?; let mut nfs = db.get_nullifiers()?; let (mut tree, mut witnesses) = db.get_tree()?; @@ -148,116 +148,123 @@ pub async fn sync_async( continue; } - db.begin_transaction()?; - let mut my_tx_ids: HashMap = HashMap::new(); let mut new_ids_tx: Vec = vec![]; - let dec_blocks = decrypter.decrypt_blocks(&blocks.0); let mut witnesses: Vec = vec![]; - log::info!("Dec start : {}", dec_blocks[0].height); - let start = Instant::now(); - for b in dec_blocks.iter() { - let mut my_nfs: Vec = vec![]; - for nf in b.spends.iter() { - if let Some(&nf_ref) = nfs.get(nf) { - log::info!("NF FOUND {} {}", nf_ref.id_note, b.height); - db.mark_spent(nf_ref.id_note, b.height)?; - my_nfs.push(*nf); - nfs.remove(nf); + + { // db tx scope + let db_tx = db.begin_transaction()?; + let dec_blocks = decrypter.decrypt_blocks(&blocks.0); + log::info!("Dec start : {}", dec_blocks[0].height); + let start = Instant::now(); + for b in dec_blocks.iter() { + let mut my_nfs: Vec = vec![]; + for nf in b.spends.iter() { + if let Some(&nf_ref) = nfs.get(nf) { + log::info!("NF FOUND {} {}", nf_ref.id_note, b.height); + DbAdapter::mark_spent(nf_ref.id_note, b.height, &db_tx)?; + my_nfs.push(*nf); + nfs.remove(nf); + } + } + if !b.notes.is_empty() { + log::debug!("{} {}", b.height, b.notes.len()); } - } - if !b.notes.is_empty() { - log::debug!("{} {}", b.height, b.notes.len()); - } - for n in b.notes.iter() { - let p = absolute_position_at_block_start + n.position_in_block; + for n in b.notes.iter() { + let p = absolute_position_at_block_start + n.position_in_block; - let note = &n.note; - let rcm = note.rcm().to_repr(); - let nf = note.nf(&n.ivk.fvk.vk, p as u64); + let note = &n.note; + let rcm = note.rcm().to_repr(); + let nf = note.nf(&n.ivk.fvk.vk, p as u64); - let id_tx = db.store_transaction( - &n.txid, - n.account, - n.height, - b.compact_block.time, - n.tx_index as u32, - )?; - my_tx_ids.insert( - TxIdHeight { - height: n.height, - index: n.tx_index as u32, - }, - id_tx, - ); - let id_note = db.store_received_note( - &ReceivedNote { - account: n.account, - height: n.height, - output_index: n.output_index as u32, - diversifier: n.pa.diversifier().0.to_vec(), - value: note.value, - rcm: rcm.to_vec(), - nf: nf.0.to_vec(), - spent: None, - }, - id_tx, - n.position_in_block, - )?; - db.add_value(id_tx, note.value as i64)?; - nfs.insert( - Nf(nf.0), - NfRef { - id_note, - account: n.account, - }, - ); + let id_tx = DbAdapter::store_transaction( + &n.txid, + n.account, + n.height, + b.compact_block.time, + n.tx_index as u32, + &db_tx, + )?; + my_tx_ids.insert( + TxIdHeight { + height: n.height, + index: n.tx_index as u32, + }, + id_tx, + ); + let id_note = DbAdapter::store_received_note( + &ReceivedNote { + account: n.account, + height: n.height, + output_index: n.output_index as u32, + diversifier: n.pa.diversifier().0.to_vec(), + value: note.value, + rcm: rcm.to_vec(), + nf: nf.0.to_vec(), + spent: None, + }, + id_tx, + n.position_in_block, + &db_tx + )?; + DbAdapter::add_value(id_tx, note.value as i64, &db_tx)?; + nfs.insert( + Nf(nf.0), + NfRef { + id_note, + account: n.account, + }, + ); - let w = Witness::new(p as usize, id_note, Some(n.clone())); - witnesses.push(w); - } + let w = Witness::new(p as usize, id_note, Some(n.clone())); + witnesses.push(w); + } - if !my_nfs.is_empty() || !my_tx_ids.is_empty() { - for (tx_index, tx) in b.compact_block.vtx.iter().enumerate() { - let mut added = false; - for cs in tx.spends.iter() { - let tx_id = TxIdHeight { - height: b.height, - index: tx_index as u32, - }; - if let Some(&id_tx) = my_tx_ids.get(&tx_id) { - if !added { - new_ids_tx.push(id_tx); - added = true; + if !my_nfs.is_empty() || !my_tx_ids.is_empty() { + for (tx_index, tx) in b.compact_block.vtx.iter().enumerate() { + let mut added = false; + for cs in tx.spends.iter() { + let tx_id = TxIdHeight { + height: b.height, + index: tx_index as u32, + }; + if let Some(&id_tx) = my_tx_ids.get(&tx_id) { + if !added { + new_ids_tx.push(id_tx); + added = true; + } } - } - let mut nf = [0u8; 32]; - nf.copy_from_slice(&cs.nf); - let nf = Nf(nf); - if my_nfs.contains(&nf) { - let (account, note_value) = db.get_received_note_value(&nf)?; - let txid = &*tx.hash; - let id_tx = db.store_transaction( - txid, - account, - b.height, - b.compact_block.time, - tx_index as u32, - )?; - if !added { - new_ids_tx.push(id_tx); - added = true; + let mut nf = [0u8; 32]; + nf.copy_from_slice(&cs.nf); + let nf = Nf(nf); + if my_nfs.contains(&nf) { + let (account, note_value) = DbAdapter::get_received_note_value(&nf, &db_tx)?; + let txid = &*tx.hash; + let id_tx = DbAdapter::store_transaction( + txid, + account, + b.height, + b.compact_block.time, + tx_index as u32, + &db_tx + )?; + if !added { + new_ids_tx.push(id_tx); + added = true; + } + DbAdapter::add_value(id_tx, -(note_value as i64), &db_tx)?; } - db.add_value(id_tx, -(note_value as i64))?; } } } - } - absolute_position_at_block_start += b.count_outputs as usize; + absolute_position_at_block_start += b.count_outputs as usize; + } + log::info!("Dec end : {}", start.elapsed().as_millis()); + + db_tx.commit()?; } - log::info!("Dec end : {}", start.elapsed().as_millis()); let start = Instant::now(); let mut nodes: Vec = vec![]; @@ -288,8 +295,6 @@ pub async fn sync_async( } log::info!("Witness : {}", start.elapsed().as_millis()); - db.commit()?; - let start = Instant::now(); if get_tx && !my_tx_ids.is_empty() { retrieve_tx_info(&mut client, &db_path2, &new_ids_tx) diff --git a/src/taddr.rs b/src/taddr.rs index f5c3e0f..228346a 100644 --- a/src/taddr.rs +++ b/src/taddr.rs @@ -23,8 +23,6 @@ use zcash_primitives::transaction::components::{Amount, OutPoint, TxOut}; use zcash_proofs::prover::LocalTxProver; use rand::rngs::OsRng; -pub const BIP44_PATH: &str = "m/44'/133'/0'/0/0"; - pub async fn get_taddr_balance( client: &mut CompactTxStreamerClient, address: &str, diff --git a/src/transaction.rs b/src/transaction.rs index da25952..7406e49 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -136,7 +136,7 @@ pub async fn retrieve_tx_info( tx_ids: &[u32], ) -> anyhow::Result<()> { let db = DbAdapter::new(db_path)?; - db.begin_transaction()?; + let nfs = db.get_nullifiers_raw()?; let mut nf_map: HashMap<(u32, Vec), u64> = HashMap::new(); for nf in nfs.iter() { @@ -203,7 +203,6 @@ pub async fn retrieve_tx_info( for c in contacts.iter() { db.store_contact(c)?; } - db.commit()?; Ok::<_, anyhow::Error>(()) }); diff --git a/src/wallet.rs b/src/wallet.rs index 3b8c057..18d47c5 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -203,7 +203,7 @@ impl Wallet { } pub async fn send_multi_payment( - &self, + &mut self, account: u32, recipients_json: &str, anchor_offset: u32, @@ -252,7 +252,7 @@ impl Wallet { } pub async fn send_payment( - &self, + &mut self, account: u32, to_address: &str, amount: u64, @@ -268,7 +268,7 @@ impl Wallet { } async fn _send_payment( - &self, + &mut self, account: u32, recipients: &[Recipient], anchor_offset: u32, @@ -324,8 +324,9 @@ impl Wallet { let tx_id = send_transaction(&mut client, &raw_tx, last_height).await?; log::info!("Tx ID = {}", tx_id); + let db_tx = self.db.begin_transaction()?; for id_note in selected_notes.iter() { - self.db.mark_spent(*id_note, 0)?; + DbAdapter::mark_spent(*id_note, 0, &db_tx)?; } Ok(tx_id) }