Database tweaks

This commit is contained in:
Hanh 2021-09-03 10:56:10 +08:00
parent 8f7628e088
commit 70ab12b9d1
5 changed files with 138 additions and 131 deletions

View File

@ -1,10 +1,10 @@
use crate::chain::{Nf, NfRef};
use crate::prices::Quote;
use crate::taddr::{derive_tkeys, BIP44_PATH};
use crate::taddr::derive_tkeys;
use crate::transaction::{Contact, TransactionInfo};
use crate::{CTree, Witness, NETWORK};
use chrono::NaiveDateTime;
use rusqlite::{params, Connection, OptionalExtension, NO_PARAMS};
use rusqlite::{params, Connection, OptionalExtension, NO_PARAMS, Transaction};
use std::collections::HashMap;
use zcash_client_backend::encoding::decode_extended_full_viewing_key;
use zcash_primitives::consensus::{NetworkUpgrade, Parameters};
@ -62,16 +62,16 @@ impl DbAdapter {
Ok(DbAdapter { connection })
}
pub fn begin_transaction(&self) -> anyhow::Result<()> {
self.connection.execute("BEGIN TRANSACTION", NO_PARAMS)?;
Ok(())
pub fn begin_transaction(&mut self) -> anyhow::Result<Transaction> {
let tx = self.connection.transaction()?;
Ok(tx)
}
pub fn commit(&self) -> anyhow::Result<()> {
self.connection.execute("COMMIT", NO_PARAMS)?;
Ok(())
}
//
// pub fn commit(&self) -> anyhow::Result<()> {
// self.connection.execute("COMMIT", NO_PARAMS)?;
// Ok(())
// }
//
pub fn init_db(&self) -> anyhow::Result<()> {
migration::init_db(&self.connection)?;
Ok(())
@ -186,21 +186,21 @@ impl DbAdapter {
}
pub fn store_transaction(
&self,
txid: &[u8],
account: u32,
height: u32,
timestamp: u32,
tx_index: u32,
db_tx: &Transaction
) -> anyhow::Result<u32> {
log::debug!("+transaction");
self.connection.execute(
db_tx.execute(
"INSERT INTO transactions(account, txid, height, timestamp, tx_index, value)
VALUES (?1, ?2, ?3, ?4, ?5, 0)
ON CONFLICT DO NOTHING",
params![account, txid, height, timestamp, tx_index],
)?;
let id_tx: u32 = self.connection.query_row(
let id_tx: u32 = db_tx.query_row(
"SELECT id_tx FROM transactions WHERE account = ?1 AND txid = ?2",
params![account, txid],
|row| row.get(0),
@ -210,16 +210,16 @@ impl DbAdapter {
}
pub fn store_received_note(
&self,
note: &ReceivedNote,
id_tx: u32,
position: usize,
db_tx: &Transaction
) -> anyhow::Result<u32> {
log::debug!("+received_note {}", id_tx);
self.connection.execute("INSERT INTO received_notes(account, tx, height, position, output_index, diversifier, value, rcm, nf, spent)
db_tx.execute("INSERT INTO received_notes(account, tx, height, position, output_index, diversifier, value, rcm, nf, spent)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)
ON CONFLICT DO NOTHING", params![note.account, id_tx, note.height, position as u32, note.output_index, note.diversifier, note.value as i64, note.rcm, note.nf, note.spent])?;
let id_note: u32 = self.connection.query_row(
let id_note: u32 = db_tx.query_row(
"SELECT id_note FROM received_notes WHERE tx = ?1 AND output_index = ?2",
params![id_tx, note.output_index],
|row| row.get(0),
@ -254,16 +254,16 @@ impl DbAdapter {
Ok(())
}
pub fn add_value(&self, id_tx: u32, value: i64) -> anyhow::Result<()> {
self.connection.execute(
pub fn add_value(id_tx: u32, value: i64, db_tx: &Transaction) -> anyhow::Result<()> {
db_tx.execute(
"UPDATE transactions SET value = value + ?2 WHERE id_tx = ?1",
params![id_tx, value],
)?;
Ok(())
}
pub fn get_received_note_value(&self, nf: &Nf) -> anyhow::Result<(u32, i64)> {
let (account, value) = self.connection.query_row(
pub fn get_received_note_value(nf: &Nf, db_tx: &Transaction) -> anyhow::Result<(u32, i64)> {
let (account, value) = db_tx.query_row(
"SELECT account, value FROM received_notes WHERE nf = ?1",
params![nf.0.to_vec()],
|row| {
@ -455,9 +455,9 @@ impl DbAdapter {
Ok(spendable_notes)
}
pub fn mark_spent(&self, id: u32, height: u32) -> anyhow::Result<()> {
pub fn mark_spent(id: u32, height: u32, tx: &Transaction) -> anyhow::Result<()> {
log::debug!("+mark_spent");
self.connection.execute(
tx.execute(
"UPDATE received_notes SET spent = ?1 WHERE id_note = ?2",
params![height, id],
)?;
@ -632,7 +632,8 @@ impl DbAdapter {
pub fn create_taddr(&self, account: u32) -> anyhow::Result<()> {
let seed = self.get_seed(account)?;
if let Some(seed) = seed {
let (sk, address) = derive_tkeys(&seed, BIP44_PATH)?;
let bip44_path = format!("m/44'/{}'/0'/0/0", NETWORK.coin_type());
let (sk, address) = derive_tkeys(&seed, &bip44_path)?;
self.connection.execute(
"INSERT INTO taddrs(account, sk, address) VALUES (?1, ?2, ?3) \
ON CONFLICT DO NOTHING",
@ -704,8 +705,9 @@ mod tests {
db.trim_to_height(0).unwrap();
db.store_block(1, &[0u8; 32], 0, &CTree::new()).unwrap();
let id_tx = db.store_transaction(&[0; 32], 1, 1, 0, 20).unwrap();
db.store_received_note(
let db_tx = db.begin_transaction().unwrap();
let id_tx = DbAdapter::store_transaction(&[0; 32], 1, 1, 0, 20, &db_tx).unwrap();
DbAdapter::store_received_note(
&ReceivedNote {
account: 1,
height: 1,
@ -718,6 +720,7 @@ mod tests {
},
id_tx,
5,
&db_tx
)
.unwrap();
let witness = Witness {
@ -728,6 +731,7 @@ mod tests {
filled: vec![],
cursor: CTree::new(),
};
db_tx.commit().unwrap();
db.store_witnesses(&witness, 1000, 1).unwrap();
}

View File

@ -135,7 +135,7 @@ pub async fn sync_async(
let proc_callback = progress_callback.clone();
let processor = tokio::spawn(async move {
let db = DbAdapter::new(&db_path)?;
let mut db = DbAdapter::new(&db_path)?;
let mut nfs = db.get_nullifiers()?;
let (mut tree, mut witnesses) = db.get_tree()?;
@ -148,116 +148,123 @@ pub async fn sync_async(
continue;
}
db.begin_transaction()?;
let mut my_tx_ids: HashMap<TxIdHeight, u32> = HashMap::new();
let mut new_ids_tx: Vec<u32> = vec![];
let dec_blocks = decrypter.decrypt_blocks(&blocks.0);
let mut witnesses: Vec<Witness> = vec![];
log::info!("Dec start : {}", dec_blocks[0].height);
let start = Instant::now();
for b in dec_blocks.iter() {
let mut my_nfs: Vec<Nf> = vec![];
for nf in b.spends.iter() {
if let Some(&nf_ref) = nfs.get(nf) {
log::info!("NF FOUND {} {}", nf_ref.id_note, b.height);
db.mark_spent(nf_ref.id_note, b.height)?;
my_nfs.push(*nf);
nfs.remove(nf);
{ // db tx scope
let db_tx = db.begin_transaction()?;
let dec_blocks = decrypter.decrypt_blocks(&blocks.0);
log::info!("Dec start : {}", dec_blocks[0].height);
let start = Instant::now();
for b in dec_blocks.iter() {
let mut my_nfs: Vec<Nf> = vec![];
for nf in b.spends.iter() {
if let Some(&nf_ref) = nfs.get(nf) {
log::info!("NF FOUND {} {}", nf_ref.id_note, b.height);
DbAdapter::mark_spent(nf_ref.id_note, b.height, &db_tx)?;
my_nfs.push(*nf);
nfs.remove(nf);
}
}
if !b.notes.is_empty() {
log::debug!("{} {}", b.height, b.notes.len());
}
}
if !b.notes.is_empty() {
log::debug!("{} {}", b.height, b.notes.len());
}
for n in b.notes.iter() {
let p = absolute_position_at_block_start + n.position_in_block;
for n in b.notes.iter() {
let p = absolute_position_at_block_start + n.position_in_block;
let note = &n.note;
let rcm = note.rcm().to_repr();
let nf = note.nf(&n.ivk.fvk.vk, p as u64);
let note = &n.note;
let rcm = note.rcm().to_repr();
let nf = note.nf(&n.ivk.fvk.vk, p as u64);
let id_tx = db.store_transaction(
&n.txid,
n.account,
n.height,
b.compact_block.time,
n.tx_index as u32,
)?;
my_tx_ids.insert(
TxIdHeight {
height: n.height,
index: n.tx_index as u32,
},
id_tx,
);
let id_note = db.store_received_note(
&ReceivedNote {
account: n.account,
height: n.height,
output_index: n.output_index as u32,
diversifier: n.pa.diversifier().0.to_vec(),
value: note.value,
rcm: rcm.to_vec(),
nf: nf.0.to_vec(),
spent: None,
},
id_tx,
n.position_in_block,
)?;
db.add_value(id_tx, note.value as i64)?;
nfs.insert(
Nf(nf.0),
NfRef {
id_note,
account: n.account,
},
);
let id_tx = DbAdapter::store_transaction(
&n.txid,
n.account,
n.height,
b.compact_block.time,
n.tx_index as u32,
&db_tx,
)?;
my_tx_ids.insert(
TxIdHeight {
height: n.height,
index: n.tx_index as u32,
},
id_tx,
);
let id_note = DbAdapter::store_received_note(
&ReceivedNote {
account: n.account,
height: n.height,
output_index: n.output_index as u32,
diversifier: n.pa.diversifier().0.to_vec(),
value: note.value,
rcm: rcm.to_vec(),
nf: nf.0.to_vec(),
spent: None,
},
id_tx,
n.position_in_block,
&db_tx
)?;
DbAdapter::add_value(id_tx, note.value as i64, &db_tx)?;
nfs.insert(
Nf(nf.0),
NfRef {
id_note,
account: n.account,
},
);
let w = Witness::new(p as usize, id_note, Some(n.clone()));
witnesses.push(w);
}
let w = Witness::new(p as usize, id_note, Some(n.clone()));
witnesses.push(w);
}
if !my_nfs.is_empty() || !my_tx_ids.is_empty() {
for (tx_index, tx) in b.compact_block.vtx.iter().enumerate() {
let mut added = false;
for cs in tx.spends.iter() {
let tx_id = TxIdHeight {
height: b.height,
index: tx_index as u32,
};
if let Some(&id_tx) = my_tx_ids.get(&tx_id) {
if !added {
new_ids_tx.push(id_tx);
added = true;
if !my_nfs.is_empty() || !my_tx_ids.is_empty() {
for (tx_index, tx) in b.compact_block.vtx.iter().enumerate() {
let mut added = false;
for cs in tx.spends.iter() {
let tx_id = TxIdHeight {
height: b.height,
index: tx_index as u32,
};
if let Some(&id_tx) = my_tx_ids.get(&tx_id) {
if !added {
new_ids_tx.push(id_tx);
added = true;
}
}
}
let mut nf = [0u8; 32];
nf.copy_from_slice(&cs.nf);
let nf = Nf(nf);
if my_nfs.contains(&nf) {
let (account, note_value) = db.get_received_note_value(&nf)?;
let txid = &*tx.hash;
let id_tx = db.store_transaction(
txid,
account,
b.height,
b.compact_block.time,
tx_index as u32,
)?;
if !added {
new_ids_tx.push(id_tx);
added = true;
let mut nf = [0u8; 32];
nf.copy_from_slice(&cs.nf);
let nf = Nf(nf);
if my_nfs.contains(&nf) {
let (account, note_value) = DbAdapter::get_received_note_value(&nf, &db_tx)?;
let txid = &*tx.hash;
let id_tx = DbAdapter::store_transaction(
txid,
account,
b.height,
b.compact_block.time,
tx_index as u32,
&db_tx
)?;
if !added {
new_ids_tx.push(id_tx);
added = true;
}
DbAdapter::add_value(id_tx, -(note_value as i64), &db_tx)?;
}
db.add_value(id_tx, -(note_value as i64))?;
}
}
}
}
absolute_position_at_block_start += b.count_outputs as usize;
absolute_position_at_block_start += b.count_outputs as usize;
}
log::info!("Dec end : {}", start.elapsed().as_millis());
db_tx.commit()?;
}
log::info!("Dec end : {}", start.elapsed().as_millis());
let start = Instant::now();
let mut nodes: Vec<Node> = vec![];
@ -288,8 +295,6 @@ pub async fn sync_async(
}
log::info!("Witness : {}", start.elapsed().as_millis());
db.commit()?;
let start = Instant::now();
if get_tx && !my_tx_ids.is_empty() {
retrieve_tx_info(&mut client, &db_path2, &new_ids_tx)

View File

@ -23,8 +23,6 @@ use zcash_primitives::transaction::components::{Amount, OutPoint, TxOut};
use zcash_proofs::prover::LocalTxProver;
use rand::rngs::OsRng;
pub const BIP44_PATH: &str = "m/44'/133'/0'/0/0";
pub async fn get_taddr_balance(
client: &mut CompactTxStreamerClient<Channel>,
address: &str,

View File

@ -136,7 +136,7 @@ pub async fn retrieve_tx_info(
tx_ids: &[u32],
) -> anyhow::Result<()> {
let db = DbAdapter::new(db_path)?;
db.begin_transaction()?;
let nfs = db.get_nullifiers_raw()?;
let mut nf_map: HashMap<(u32, Vec<u8>), u64> = HashMap::new();
for nf in nfs.iter() {
@ -203,7 +203,6 @@ pub async fn retrieve_tx_info(
for c in contacts.iter() {
db.store_contact(c)?;
}
db.commit()?;
Ok::<_, anyhow::Error>(())
});

View File

@ -203,7 +203,7 @@ impl Wallet {
}
pub async fn send_multi_payment(
&self,
&mut self,
account: u32,
recipients_json: &str,
anchor_offset: u32,
@ -252,7 +252,7 @@ impl Wallet {
}
pub async fn send_payment(
&self,
&mut self,
account: u32,
to_address: &str,
amount: u64,
@ -268,7 +268,7 @@ impl Wallet {
}
async fn _send_payment(
&self,
&mut self,
account: u32,
recipients: &[Recipient],
anchor_offset: u32,
@ -324,8 +324,9 @@ impl Wallet {
let tx_id = send_transaction(&mut client, &raw_tx, last_height).await?;
log::info!("Tx ID = {}", tx_id);
let db_tx = self.db.begin_transaction()?;
for id_note in selected_notes.iter() {
self.db.mark_spent(*id_note, 0)?;
DbAdapter::mark_spent(*id_note, 0, &db_tx)?;
}
Ok(tx_id)
}