Performance optimizations

This commit is contained in:
Hanh 2021-07-28 13:07:20 +08:00
parent e5fdc41ae9
commit 632792ffba
6 changed files with 122 additions and 89 deletions

View File

@ -11,13 +11,13 @@ use std::time::Instant;
use thiserror::Error;
use tonic::transport::{Certificate, Channel, ClientTlsConfig};
use tonic::Request;
use zcash_client_backend::encoding::decode_extended_full_viewing_key;
use zcash_primitives::consensus::{BlockHeight, NetworkUpgrade, Parameters};
use zcash_primitives::merkle_tree::{CommitmentTree, IncrementalWitness};
use zcash_primitives::sapling::note_encryption::try_sapling_compact_note_decryption;
use zcash_primitives::sapling::{Node, Note, PaymentAddress};
use zcash_primitives::transaction::components::sapling::CompactOutputDescription;
use zcash_primitives::zip32::ExtendedFullViewingKey;
use crate::db::AccountViewKey;
const MAX_CHUNK: u32 = 50000;
@ -78,7 +78,7 @@ pub async fn download_chain(
}
pub struct DecryptNode {
fvks: HashMap<u32, ExtendedFullViewingKey>,
vks: HashMap<u32, AccountViewKey>,
}
#[derive(Eq, Hash, PartialEq, Copy, Clone)]
@ -105,6 +105,7 @@ pub struct DecryptedNote {
pub note: Note,
pub pa: PaymentAddress,
pub position_in_block: usize,
pub viewonly: bool,
pub height: u32,
pub txid: Vec<u8>,
@ -129,7 +130,7 @@ pub fn to_output_description(co: &CompactOutput) -> CompactOutputDescription {
fn decrypt_notes<'a>(
block: &'a CompactBlock,
fvks: &HashMap<u32, ExtendedFullViewingKey>,
vks: &HashMap<u32, AccountViewKey>
) -> DecryptedBlock<'a> {
let height = BlockHeight::from_u32(block.height as u32);
let mut count_outputs = 0u32;
@ -143,17 +144,17 @@ fn decrypt_notes<'a>(
}
for (output_index, co) in vtx.outputs.iter().enumerate() {
for (&account, fvk) in fvks.iter() {
let ivk = &fvk.fvk.vk.ivk();
for (&account, vk) in vks.iter() {
let od = to_output_description(co);
if let Some((note, pa)) =
try_sapling_compact_note_decryption(&NETWORK, height, ivk, &od)
try_sapling_compact_note_decryption(&NETWORK, height, &vk.ivk, &od)
{
notes.push(DecryptedNote {
account,
ivk: fvk.clone(),
ivk: vk.fvk.clone(),
note,
pa,
viewonly: vk.viewonly,
position_in_block: count_outputs as usize,
height: block.height as u32,
tx_index,
@ -175,14 +176,14 @@ fn decrypt_notes<'a>(
}
impl DecryptNode {
pub fn new(fvks: HashMap<u32, ExtendedFullViewingKey>) -> DecryptNode {
DecryptNode { fvks }
pub fn new(vks: HashMap<u32, AccountViewKey>) -> DecryptNode {
DecryptNode { vks }
}
pub fn decrypt_blocks<'a>(&self, blocks: &'a [CompactBlock]) -> Vec<DecryptedBlock<'a>> {
let mut decrypted_blocks: Vec<DecryptedBlock> = blocks
.par_iter()
.map(|b| decrypt_notes(b, &self.fvks))
.map(|b| decrypt_notes(b, &self.vks))
.collect();
decrypted_blocks.sort_by(|a, b| a.height.cmp(&b.height));
decrypted_blocks
@ -327,20 +328,8 @@ pub async fn connect_lightwalletd(url: &str) -> anyhow::Result<CompactTxStreamer
Ok(client)
}
pub async fn sync(fvks: &HashMap<u32, String>, ld_url: &str) -> anyhow::Result<()> {
let fvks: HashMap<_, _> = fvks
.iter()
.map(|(&account, fvk)| {
let fvk = decode_extended_full_viewing_key(
NETWORK.hrp_sapling_extended_full_viewing_key(),
&fvk,
)
.unwrap()
.unwrap();
(account, fvk)
})
.collect();
let decrypter = DecryptNode::new(fvks);
pub async fn sync(vks: HashMap<u32, AccountViewKey>, ld_url: &str) -> anyhow::Result<()> {
let decrypter = DecryptNode::new(vks);
let mut client = connect_lightwalletd(ld_url).await?;
let start_height: u32 = crate::NETWORK
.activation_height(NetworkUpgrade::Sapling)
@ -385,7 +374,7 @@ mod tests {
use std::time::Instant;
use zcash_client_backend::encoding::decode_extended_full_viewing_key;
use zcash_primitives::consensus::{NetworkUpgrade, Parameters};
use zcash_primitives::zip32::ExtendedFullViewingKey;
use crate::db::AccountViewKey;
#[tokio::test]
async fn test_get_latest_height() -> anyhow::Result<()> {
@ -400,12 +389,12 @@ mod tests {
dotenv::dotenv().unwrap();
let fvk = dotenv::var("FVK").unwrap();
let mut fvks: HashMap<u32, ExtendedFullViewingKey> = HashMap::new();
let mut fvks: HashMap<u32, AccountViewKey> = HashMap::new();
let fvk =
decode_extended_full_viewing_key(NETWORK.hrp_sapling_extended_full_viewing_key(), &fvk)
.unwrap()
.unwrap();
fvks.insert(1, fvk);
fvks.insert(1, AccountViewKey::from_fvk(&fvk));
let decrypter = DecryptNode::new(fvks);
let mut client = CompactTxStreamerClient::connect(LWD_URL).await?;
let start_height: u32 = crate::NETWORK

View File

@ -2,13 +2,14 @@ use crate::chain::{Nf, NfRef};
use crate::db::migration::{get_schema_version, update_schema_version};
use crate::taddr::{derive_tkeys, BIP44_PATH};
use crate::transaction::{TransactionInfo, Contact};
use crate::{CTree, Witness};
use crate::{CTree, Witness, NETWORK};
use rusqlite::{params, Connection, OptionalExtension, NO_PARAMS};
use std::collections::HashMap;
use zcash_primitives::consensus::{NetworkUpgrade, Parameters};
use zcash_primitives::merkle_tree::IncrementalWitness;
use zcash_primitives::sapling::{Diversifier, Node, Note, Rseed};
use zcash_primitives::sapling::{Diversifier, Node, Note, Rseed, SaplingIvk};
use zcash_primitives::zip32::{DiversifierIndex, ExtendedFullViewingKey};
use zcash_client_backend::encoding::decode_extended_full_viewing_key;
mod migration;
@ -37,12 +38,43 @@ pub struct SpendableNote {
pub witness: IncrementalWitness<Node>,
}
pub struct AccountViewKey {
pub fvk: ExtendedFullViewingKey,
pub ivk: SaplingIvk,
pub viewonly: bool,
}
impl AccountViewKey {
pub fn from_fvk(fvk: &ExtendedFullViewingKey) -> AccountViewKey {
AccountViewKey {
fvk: fvk.clone(),
ivk: fvk.fvk.vk.ivk(),
viewonly: false,
}
}
}
impl DbAdapter {
pub fn new(db_path: &str) -> anyhow::Result<DbAdapter> {
let connection = Connection::open(db_path)?;
Ok(DbAdapter { connection })
}
pub fn synchronous(&self, flag: bool) -> anyhow::Result<()> {
self.connection.execute(&format!("PRAGMA synchronous = {}", if flag { "on" } else { "off" }), NO_PARAMS)?;
Ok(())
}
pub fn begin_transaction(&self) -> anyhow::Result<()> {
self.connection.execute("BEGIN TRANSACTION", NO_PARAMS)?;
Ok(())
}
pub fn commit(&self) -> anyhow::Result<()> {
self.connection.execute("COMMIT", NO_PARAMS)?;
Ok(())
}
pub fn init_db(&self) -> anyhow::Result<()> {
self.connection.execute(
"CREATE TABLE IF NOT EXISTS schema_version (
@ -166,23 +198,26 @@ impl DbAdapter {
params![name, seed, sk, ivk, address],
)?;
let id_tx: u32 = self.connection.query_row(
"SELECT id_account FROM accounts WHERE sk = ?1",
params![sk],
"SELECT id_account FROM accounts WHERE ivk = ?1",
params![ivk],
|row| row.get(0),
)?;
Ok(id_tx)
}
pub fn get_fvks(&self) -> anyhow::Result<HashMap<u32, String>> {
pub fn get_fvks(&self) -> anyhow::Result<HashMap<u32, AccountViewKey>> {
let mut statement = self
.connection
.prepare("SELECT id_account, ivk FROM accounts")?;
.prepare("SELECT id_account, ivk, sk FROM accounts")?;
let rows = statement.query_map(NO_PARAMS, |row| {
let account: u32 = row.get(0)?;
let ivk: String = row.get(1)?;
Ok((account, ivk))
let sk: Option<String> = row.get(2)?;
let fvk = decode_extended_full_viewing_key(NETWORK.hrp_sapling_extended_full_viewing_key(), &ivk).unwrap().unwrap();
let ivk = fvk.fvk.vk.ivk();
Ok((account, AccountViewKey { fvk, ivk, viewonly: sk.is_none() }))
})?;
let mut fvks: HashMap<u32, String> = HashMap::new();
let mut fvks: HashMap<u32, AccountViewKey> = HashMap::new();
for r in rows {
let row = r?;
fvks.insert(row.0, row.1);
@ -577,11 +612,10 @@ impl DbAdapter {
"SELECT seed FROM accounts WHERE id_account = ?1",
params![account],
|row| {
let sk: String = row.get(0)?;
let sk: Option<String> = row.get(0)?;
Ok(sk)
},
)
.optional()?;
)?;
log::info!("-get_seed");
Ok(seed)
}

View File

@ -1,6 +1,6 @@
use rustyline::Editor;
use rustyline::error::ReadlineError;
use clap::{AppSettings, Clap, App};
use clap::{AppSettings, Clap};
#[derive(Clap, Debug)]
#[clap(setting = AppSettings::NoBinaryName)]

View File

@ -24,13 +24,15 @@ async fn test() -> anyhow::Result<()> {
let seed = dotenv::var("SEED").unwrap();
let seed2 = dotenv::var("SEED2").unwrap();
let ivk = dotenv::var("IVK").unwrap();
let address = dotenv::var("ADDRESS").unwrap();
let progress = |height| {
log::info!("Height = {}", height);
};
let wallet = Wallet::new(DB_NAME, LWD_URL);
wallet.new_account_with_key("main", &seed).unwrap();
wallet.new_account_with_key("test", &seed2).unwrap();
// wallet.new_account_with_key("main", &seed).unwrap();
// wallet.new_account_with_key("test", &seed2).unwrap();
wallet.new_account_with_key("zecpages", &ivk).unwrap();
let res = wallet.sync(true, ANCHOR_OFFSET, progress).await;
if let Err(err) = res {
@ -40,21 +42,21 @@ async fn test() -> anyhow::Result<()> {
panic!("{}", err);
}
}
let tx_id = wallet
.send_payment(
1,
&address,
50000,
"test memo",
u64::max_value(),
2,
move |progress| {
println!("{}", progress.cur());
},
)
.await
.unwrap();
println!("TXID = {}", tx_id);
// let tx_id = wallet
// .send_payment(
// 1,
// &address,
// 50000,
// "test memo",
// u64::max_value(),
// 2,
// move |progress| {
// println!("{}", progress.cur());
// },
// )
// .await
// .unwrap();
// println!("TXID = {}", tx_id);
Ok(())
}

View File

@ -1,11 +1,11 @@
use crate::builder::BlockProcessor;
use crate::chain::{Nf, NfRef};
use crate::db::{DbAdapter, ReceivedNote};
use crate::db::{DbAdapter, ReceivedNote, AccountViewKey};
use crate::lw_rpc::compact_tx_streamer_client::CompactTxStreamerClient;
use crate::transaction::retrieve_tx_info;
use crate::{
calculate_tree_state_v2, connect_lightwalletd, download_chain, get_latest_height, CompactBlock,
DecryptNode, Witness, LWD_URL, NETWORK,
DecryptNode, Witness, LWD_URL,
};
use ff::PrimeField;
use log::{debug, info};
@ -16,7 +16,6 @@ use std::sync::Arc;
use std::time::Instant;
use tokio::sync::mpsc;
use tokio::sync::Mutex;
use zcash_client_backend::encoding::decode_extended_full_viewing_key;
use zcash_primitives::consensus::{NetworkUpgrade, Parameters};
use zcash_primitives::sapling::Node;
use zcash_primitives::zip32::ExtendedFullViewingKey;
@ -25,7 +24,7 @@ pub async fn scan_all(fvks: &[ExtendedFullViewingKey]) -> anyhow::Result<()> {
let fvks: HashMap<_, _> = fvks
.iter()
.enumerate()
.map(|(i, fvk)| (i as u32, fvk.clone()))
.map(|(i, fvk)| (i as u32, AccountViewKey::from_fvk(fvk)))
.collect();
let decrypter = DecryptNode::new(fvks);
@ -66,6 +65,9 @@ struct BlockMetadata {
timestamp: u32,
}
#[derive(Debug)]
struct TxIdSet(Vec<u32>);
impl std::fmt::Debug for Blocks {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Blocks of len {}", self.0.len())
@ -86,34 +88,24 @@ pub async fn sync_async(
let db_path = db_path.to_string();
let mut client = connect_lightwalletd(&ld_url).await?;
let (start_height, mut prev_hash, fvks) = {
let (start_height, mut prev_hash, vks) = {
let db = DbAdapter::new(&db_path)?;
let height = db.get_db_height()?;
let hash = db.get_db_hash(height)?;
let fvks = db.get_fvks()?;
(height, hash, fvks)
let vks = db.get_fvks()?;
(height, hash, vks)
};
let end_height = get_latest_height(&mut client).await?;
let end_height = (end_height - target_height_offset).max(start_height);
let fvks: HashMap<_, _> = fvks
.iter()
.map(|(&account, fvk)| {
let fvk = decode_extended_full_viewing_key(
NETWORK.hrp_sapling_extended_full_viewing_key(),
&fvk,
)
.unwrap()
.unwrap();
(account, fvk)
})
.collect();
let decrypter = DecryptNode::new(fvks);
let decrypter = DecryptNode::new(vks);
let (downloader_tx, mut download_rx) = mpsc::channel::<Range<u32>>(1);
let (processor_tx, mut processor_rx) = mpsc::channel::<Blocks>(1);
let ld_url2 = ld_url.clone();
let db_path2 = db_path.clone();
let downloader = tokio::spawn(async move {
let mut client = connect_lightwalletd(&ld_url2).await?;
while let Some(range) = download_rx.recv().await {
@ -138,8 +130,8 @@ pub async fn sync_async(
let processor = tokio::spawn(async move {
let db = DbAdapter::new(&db_path)?;
db.synchronous(false)?;
let mut nfs = db.get_nullifiers()?;
let mut new_tx_ids: Vec<u32> = vec![];
let (mut tree, mut witnesses) = db.get_tree()?;
let mut bp = BlockProcessor::new(&tree, &witnesses);
@ -151,15 +143,21 @@ pub async fn sync_async(
continue;
}
db.begin_transaction()?;
let mut new_tx_ids: Vec<u32> = vec![];
let dec_blocks = decrypter.decrypt_blocks(&blocks.0);
let mut witnesses: Vec<Witness> = vec![];
log::info!("Dec start : {}", dec_blocks[0].height);
let start = Instant::now();
for b in dec_blocks.iter() {
let mut my_nfs: Vec<Nf> = vec![];
for nf in b.spends.iter() {
if let Some(&nf_ref) = nfs.get(nf) {
log::debug!("NF FOUND {} {}", nf_ref.id_note, b.height);
log::info!("NF FOUND {} {}", nf_ref.id_note, b.height);
db.mark_spent(nf_ref.id_note, b.height)?;
my_nfs.push(*nf);
nfs.remove(nf);
}
}
if !b.notes.is_empty() {
@ -204,8 +202,10 @@ pub async fn sync_async(
},
);
let w = Witness::new(p as usize, id_note, Some(n.clone()));
witnesses.push(w);
if !n.viewonly {
let w = Witness::new(p as usize, id_note, Some(n.clone()));
witnesses.push(w);
}
}
if !my_nfs.is_empty() {
@ -233,7 +233,9 @@ pub async fn sync_async(
absolute_position_at_block_start += b.count_outputs as usize;
}
log::info!("Dec end : {}", start.elapsed().as_millis());
let start = Instant::now();
let mut nodes: Vec<Node> = vec![];
for cb in blocks.0.iter() {
for tx in cb.vtx.iter() {
@ -260,6 +262,16 @@ pub async fn sync_async(
timestamp: block.time,
});
}
log::info!("Witness : {}", start.elapsed().as_millis());
db.commit()?;
let start = Instant::now();
if get_tx && !new_tx_ids.is_empty() {
retrieve_tx_info(&mut client, &db_path2, &new_tx_ids).await.unwrap();
}
log::info!("Transaction Details : {}", start.elapsed().as_millis());
log::info!("progress: {}", blocks.0[0].height);
let callback = proc_callback.lock().await;
callback(blocks.0[0].height as u32);
@ -278,15 +290,12 @@ pub async fn sync_async(
}
}
if get_tx && !new_tx_ids.is_empty() {
retrieve_tx_info(&new_tx_ids, &ld_url, &db_path).await?;
}
let callback = progress_callback.lock().await;
callback(end_height);
log::debug!("Witnesses {}", witnesses.len());
db.purge_old_witnesses(end_height - 100)?;
Ok::<_, anyhow::Error>(())
});

View File

@ -1,4 +1,4 @@
use crate::{connect_lightwalletd, CompactTxStreamerClient, DbAdapter, TxFilter, NETWORK};
use crate::{CompactTxStreamerClient, DbAdapter, TxFilter, NETWORK};
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use tonic::transport::Channel;
@ -107,10 +107,9 @@ pub async fn decode_transaction(
Ok(tx_info)
}
pub async fn retrieve_tx_info(tx_ids: &[u32], ld_url: &str, db_path: &str) -> anyhow::Result<()> {
let mut tx_ids_set: HashSet<u32> = HashSet::new();
let mut client = connect_lightwalletd(ld_url).await?;
pub async fn retrieve_tx_info(client: &mut CompactTxStreamerClient<Channel>, db_path: &str, tx_ids: &[u32]) -> anyhow::Result<()> {
let db = DbAdapter::new(db_path)?;
let mut tx_ids_set: HashSet<u32> = HashSet::new();
let nfs = db.get_nullifiers_raw()?;
let mut nf_map: HashMap<(u32, Vec<u8>), u64> = HashMap::new();
for nf in nfs.iter() {
@ -122,7 +121,7 @@ pub async fn retrieve_tx_info(tx_ids: &[u32], ld_url: &str, db_path: &str) -> an
let (account, height, tx_hash) = db.get_txhash(id_tx)?;
let fvk = db.get_ivk(account)?;
let tx_info =
decode_transaction(&mut client, &nf_map, account, &fvk, &tx_hash, height).await?;
decode_transaction(client, &nf_map, account, &fvk, &tx_hash, height).await?;
if !tx_info.address.is_empty() && !tx_info.memo.is_empty() {
if let Some(contact) = decode_contact(&tx_info.address, &tx_info.memo)? {
db.store_contact(account, &contact)?;