diff --git a/src/api/dart_ffi.rs b/src/api/dart_ffi.rs index 6836ab7..bb8496d 100644 --- a/src/api/dart_ffi.rs +++ b/src/api/dart_ffi.rs @@ -128,8 +128,9 @@ pub unsafe extern "C" fn get_lwd_url(coin: u8) -> *mut c_char { #[no_mangle] pub unsafe extern "C" fn reset_app() { let res = || { - crate::api::account::reset_db(0)?; - crate::api::account::reset_db(1)?; + for i in 0..MAX_COINS { + crate::api::account::reset_db(i)?; + } Ok(()) }; log_error(res()) diff --git a/src/api/sync.rs b/src/api/sync.rs index 4c18033..a6dd2df 100644 --- a/src/api/sync.rs +++ b/src/api/sync.rs @@ -9,6 +9,7 @@ use tokio::sync::Mutex; use tonic::transport::Channel; use tonic::Request; use zcash_primitives::sapling::Note; +use crate::sync::CTree; const DEFAULT_CHUNK_SIZE: u32 = 100_000; diff --git a/src/chain.rs b/src/chain.rs index 4353314..a48c087 100644 --- a/src/chain.rs +++ b/src/chain.rs @@ -4,12 +4,12 @@ use crate::lw_rpc::*; use crate::scan::Blocks; use ff::PrimeField; use futures::{future, FutureExt}; -use log::info; use rand::prelude::SliceRandom; use rand::rngs::OsRng; use rayon::prelude::*; use std::collections::HashMap; use std::convert::TryInto; +use std::future::Future; use std::marker::PhantomData; use std::path::Path; use std::sync::Mutex; @@ -140,13 +140,12 @@ const MAX_OUTPUTS_PER_CHUNKS: usize = 200_000; #[allow(unused_variables)] pub async fn download_chain( client: &mut CompactTxStreamerClient, - n_ivks: usize, start_height: u32, end_height: u32, mut prev_hash: Option<[u8; 32]>, max_cost: u32, - blocks_tx: Sender, cancel: &'static Mutex, + handler: Sender, ) -> anyhow::Result<()> { let outputs_per_chunk = get_available_memory()? / get_mem_per_output(); let outputs_per_chunk = outputs_per_chunk.min(MAX_OUTPUTS_PER_CHUNKS); @@ -210,7 +209,10 @@ pub async fn download_chain( // output let out = cbs; cbs = Vec::new(); - blocks_tx.send(Blocks(out, total_block_size)).await.unwrap(); + let blocks = Blocks(out, total_block_size); + if !blocks.0.is_empty() { + let _ = handler.send(blocks).await; + } output_count = 0; total_block_size = 0; } @@ -218,7 +220,10 @@ pub async fn download_chain( cbs.push(block); output_count += block_output_count; } - let _ = blocks_tx.send(Blocks(cbs, total_block_size)).await; + let blocks = Blocks(cbs, total_block_size); + if !blocks.0.is_empty() { + let _ = handler.send(blocks).await; + } Ok(()) } diff --git a/src/db.rs b/src/db.rs index 5e476c6..3f620be 100644 --- a/src/db.rs +++ b/src/db.rs @@ -47,6 +47,7 @@ pub struct ReceivedNote { pub spent: Option, } +#[derive(Clone)] pub struct ReceivedNoteShort { pub id: u32, pub account: u32, @@ -425,6 +426,19 @@ impl DbAdapter { Ok(()) } + pub fn store_tree(height: u32, hash: &[u8], tree: &CTree, db_tx: &Connection, shielded_pool: &str) -> anyhow::Result<()> { + let mut bb: Vec = vec![]; + tree.write(&mut bb)?; + db_tx.execute(&format!("INSERT INTO blocks(height, hash, {pool}_tree, timestamp) VALUES (?1,?2,?3,0) ON CONFLICT DO UPDATE + SET {pool}_tree = excluded.{pool}_tree", pool = shielded_pool), params![height, hash, &bb])?; + Ok(()) + } + + pub fn store_block_timestamp(&self, height: u32, timestamp: u32) -> anyhow::Result<()> { + self.connection.execute("UPDATE blocks SET timestamp = ?1 WHERE height = ?2", params![timestamp, height])?; + Ok(()) + } + pub fn store_tx_metadata(&self, id_tx: u32, tx_info: &TransactionInfo) -> anyhow::Result<()> { self.connection.execute( "UPDATE transactions SET address = ?1, memo = ?2 WHERE id_tx = ?3", @@ -500,7 +514,8 @@ impl DbAdapter { } pub fn get_tree(&self) -> anyhow::Result<(TreeCheckpoint, TreeCheckpoint)> { - self.get_tree_by_name("sapling"); // TODO: pack in TreeCheckpoint + self.get_tree_by_name("sapling")?; // TODO: pack in TreeCheckpoint + todo!() } pub fn get_tree_by_name(&self, shielded_pool: &str) -> anyhow::Result { @@ -582,14 +597,14 @@ impl DbAdapter { pub fn get_unspent_nullifiers( &self, - account: u32, ) -> anyhow::Result> { - let sql = "SELECT id_note, nf, value FROM received_notes WHERE account = ?1 AND (spent IS NULL OR spent = 0)"; + let sql = "SELECT id_note, account, nf, value FROM received_notes WHERE spent IS NULL OR spent = 0"; let mut statement = self.connection.prepare(sql)?; - let nfs_res = statement.query_map(params![account], |row| { + let nfs_res = statement.query_map(params![], |row| { let id: u32 = row.get(0)?; - let nf: Vec = row.get(1)?; - let value: i64 = row.get(2)?; + let account: u32 = row.get(1)?; + let nf: Vec = row.get(2)?; + let value: i64 = row.get(3)?; let nf: [u8; 32] = nf.try_into().unwrap(); let nf = Nf(nf); Ok(ReceivedNoteShort { @@ -1271,8 +1286,8 @@ pub struct AccountData { #[cfg(test)] mod tests { use crate::db::{DbAdapter, DEFAULT_DB_PATH, ReceivedNote}; - use crate::commitment::{CTree, Witness}; use zcash_params::coin::CoinType; + use crate::sync::{CTree, Witness}; #[test] fn test_db() { @@ -1303,10 +1318,10 @@ mod tests { let witness = Witness { position: 10, id_note: 0, - note: None, tree: CTree::new(), filled: vec![], cursor: CTree::new(), + cmx: [0u8; 32] }; DbAdapter::store_witnesses(&db_tx, &witness, 1000, 1).unwrap(); db_tx.commit().unwrap(); diff --git a/src/sapling/hash.rs b/src/sapling/hash.rs index c3712b0..d65cad8 100644 --- a/src/sapling/hash.rs +++ b/src/sapling/hash.rs @@ -121,7 +121,7 @@ mod tests { use std::convert::TryInto; use rand::RngCore; use rand::rngs::OsRng; - use crate::pedersen_hash; + use crate::hash::pedersen_hash; use crate::sapling::hash::hash_combine; #[test] diff --git a/src/scan.rs b/src/scan.rs index e2fe1ac..b7e5b2a 100644 --- a/src/scan.rs +++ b/src/scan.rs @@ -4,10 +4,7 @@ use serde::Serialize; use std::cmp::Ordering; use crate::transaction::retrieve_tx_info; -use crate::{ - connect_lightwalletd, CompactBlock, CompactSaplingOutput, - CompactTx -}; +use crate::{connect_lightwalletd, CompactBlock, CompactSaplingOutput, CompactTx, DbAdapterBuilder, chain}; use crate::chain::{DecryptNode, download_chain}; use ff::PrimeField; @@ -25,6 +22,9 @@ use zcash_params::coin::{get_coin_chain, CoinType}; use zcash_primitives::consensus::{Network, Parameters}; use zcash_primitives::sapling::{Node, Note}; +use zcash_primitives::sapling::note_encryption::SaplingDomain; +use crate::sapling::{DecryptedSaplingNote, SaplingDecrypter, SaplingHasher, SaplingViewKey}; +use crate::sync::{CTree, Synchronizer, WarpProcessor}; pub struct Blocks(pub Vec, pub usize); @@ -58,7 +58,86 @@ pub struct TxIdHeight { index: u32, } -pub async fn sync_async( +type SaplingSynchronizer = Synchronizer, SaplingViewKey, DecryptedSaplingNote, + SaplingDecrypter, SaplingHasher>; + +pub async fn sync_async<'a>( + coin_type: CoinType, + _chunk_size: u32, + get_tx: bool, + db_path: &'a str, + target_height_offset: u32, + max_cost: u32, + progress_callback: AMProgressCallback, + cancel: &'static std::sync::Mutex, + ld_url: &'a str, +) -> anyhow::Result<()> { + let ld_url = ld_url.to_owned(); + let db_path = db_path.to_owned(); + let network = { + let chain = get_coin_chain(coin_type); + *chain.network() + }; + + let mut client = connect_lightwalletd(&ld_url).await?; + let (start_height, prev_hash, sapling_vks) = { + let db = DbAdapter::new(coin_type, &db_path)?; + let height = db.get_db_height()?; + let hash = db.get_db_hash(height)?; + let vks = db.get_fvks()?; + let sapling_vks: Vec<_> = vks.iter().map(|(&account, ak)| { + SaplingViewKey { + account, + fvk: ak.fvk.clone(), + ivk: ak.ivk.clone() + } + }).collect(); + (height, hash, sapling_vks) + }; + let end_height = get_latest_height(&mut client).await?; + let end_height = (end_height - target_height_offset).max(start_height); + if start_height >= end_height { + return Ok(()); + } + + let (blocks_tx, mut blocks_rx) = mpsc::channel::(1); + tokio::spawn(async move { + download_chain(&mut client, start_height, end_height, prev_hash, max_cost, cancel, blocks_tx).await?; + Ok::<_, anyhow::Error>(()) + }); + + let db_builder = DbAdapterBuilder { coin_type, db_path: db_path.clone() }; + while let Some(blocks) = blocks_rx.recv().await { + let first_block = blocks.0.first().unwrap(); // cannot be empty because blocks are not + log::info!("Height: {}", first_block.height); + let last_block = blocks.0.last().unwrap(); + let last_height = last_block.height as u32; + let last_timestamp = last_block.time; + + let decrypter = SaplingDecrypter::new(network); + let warper = WarpProcessor::new(SaplingHasher::default()); + let mut sapling_synchronizer = SaplingSynchronizer::new( + decrypter, + warper, + sapling_vks.clone(), + db_builder.clone(), + "sapling".to_string(), + ); + sapling_synchronizer.initialize()?; + sapling_synchronizer.process(blocks.0)?; + + // TODO - Orchard + + let db = db_builder.build()?; + db.store_block_timestamp(last_height, last_timestamp)?; + } + + Ok(()) +} + + + +fn sync_async_old( coin_type: CoinType, _chunk_size: u32, get_tx: bool, @@ -69,6 +148,7 @@ pub async fn sync_async( cancel: &'static std::sync::Mutex, ld_url: &str, ) -> anyhow::Result<()> { + /* let ld_url = ld_url.to_owned(); let db_path = db_path.to_string(); let network = { @@ -84,6 +164,13 @@ pub async fn sync_async( let vks = db.get_fvks()?; (height, hash, vks) }; + let saplingvks: Vec<_> = vks.iter().map(|(&account, vk)| { + SaplingViewKey { + account, + fvk: vk.fvk.clone(), + ivk: vk.ivk.clone(), + } + }).collect(); let end_height = get_latest_height(&mut client).await?; let end_height = (end_height - target_height_offset).max(start_height); @@ -92,8 +179,6 @@ pub async fn sync_async( } let n_ivks = vks.len(); - let decrypter = DecryptNode::new(vks); - let (decryptor_tx, mut decryptor_rx) = mpsc::channel::(1); let (processor_tx, mut processor_rx) = mpsc::channel::<(Vec, usize)>(1); @@ -135,14 +220,12 @@ pub async fn sync_async( let processor = tokio::spawn(async move { let mut db = DbAdapter::new(coin_type, &db_path2)?; - let mut nfs = db.get_nullifiers()?; while let Some((dec_blocks, blocks_size)) = processor_rx.recv().await { if dec_blocks.is_empty() { continue; } progress.downloaded += blocks_size; - let (mut sapling_checkpoint, mut orchard_checkpoint) = db.get_tree()?; /* TODO - Change to WarpProcessors & Trial Decryptors - sapling & orchard - Feed block into WP sapling @@ -152,19 +235,26 @@ pub async fn sync_async( */ - let mut bp = BlockProcessor::new(&tree, &witnesses); - let mut absolute_position_at_block_start = tree.get_position(); - log::info!("start processing - {}", dec_blocks[0].height); log::info!("Time {:?}", chrono::offset::Local::now()); let start = Instant::now(); - let mut new_ids_tx: HashMap = HashMap::new(); - let mut witnesses: Vec = vec![]; + let decrypter = DecryptNode::new(vks); + + let mut sapling_synchronizer = SaplingSynchronizer::new( + decrypter, + warper, + saplingvks.clone(), + DbAdapterBuilder { coin_type, db_path: db_path.clone() }, + "sapling".to_string(), + ); + sapling_synchronizer.initialize()?; { // db tx scope let db_tx = db.begin_transaction()?; + + /* let outputs = dec_blocks .iter() .map(|db| db.count_outputs as usize) @@ -271,12 +361,15 @@ pub async fn sync_async( } absolute_position_at_block_start += b.count_outputs as usize; + } + */ log::info!("Dec end : {}", start.elapsed().as_millis()); db_tx.commit()?; } + /* let start = Instant::now(); let mut nodes: Vec = vec![]; for block in dec_blocks.iter() { @@ -318,22 +411,24 @@ pub async fn sync_async( tree = new_tree; witnesses = new_witnesses; + + */ if let Some(dec_block) = dec_blocks.last() { { let block = &dec_block.compact_block; let mut db_transaction = db.begin_transaction()?; let height = block.height as u32; - for w in witnesses.iter() { - DbAdapter::store_witnesses(&db_transaction, w, height, w.id_note)?; - } - DbAdapter::store_block( - &mut db_transaction, - height, - &block.hash, - block.time, - &tree, - None, - )?; + // for w in witnesses.iter() { + // DbAdapter::store_witnesses(&db_transaction, w, height, w.id_note)?; + // } + // DbAdapter::store_block( + // &mut db_transaction, + // height, + // &block.hash, + // block.time, + // &tree, + // None, + // )?; db_transaction.commit()?; // db_transaction is dropped here } @@ -379,6 +474,8 @@ pub async fn sync_async( log::info!("Sync completed"); + + */ Ok(()) } diff --git a/src/sync.rs b/src/sync.rs index e5886ef..923cf68 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -37,14 +37,31 @@ impl + Sync, TD: TrialDecrypter + Sync, H: Hasher> Synchronizer { + pub fn new(decrypter: TD, warper: WarpProcessor, vks: Vec, db: DbAdapterBuilder, shielded_pool: String) -> Self { + Synchronizer { + decrypter, + warper, + vks, + db, + shielded_pool, + + note_position: 0, + nullifiers: HashMap::default(), + tree: CTree::new(), + witnesses: vec![], + _phantom: Default::default() + } + } + + pub fn initialize(&mut self) -> Result<()> { let db = self.db.build()?; let TreeCheckpoint { tree, witnesses } = db.get_tree_by_name(&self.shielded_pool)?; self.tree = tree; self.witnesses = witnesses; + self.note_position = self.tree.get_position(); for vk in self.vks.iter() { - let account = vk.account(); - let nfs = db.get_unspent_nullifiers(account)?; + let nfs = db.get_unspent_nullifiers()?; for rn in nfs.into_iter() { self.nullifiers.insert(rn.nf.clone(), rn); } @@ -93,6 +110,7 @@ impl { impl WitnessBuilder { fn new(tree_builder: &CTreeBuilder, prev_witness: &Witness, count: usize) -> Self { let position = prev_witness.position; + // log::info!("Witness::new - {} {},{}", position, tree_builder.start, tree_builder.start + count); let inside = position >= tree_builder.start && position < tree_builder.start + count; WitnessBuilder { witness: prev_witness.clone(), @@ -434,7 +435,6 @@ impl Builder for WitnessBuilder { let depth = context.depth; let tree = &mut self.witness.tree; - if self.inside { let rp = self.p - context.adjusted_start(&offset); if depth == 0 { diff --git a/src/sync/trial_decrypt.rs b/src/sync/trial_decrypt.rs index 46d87d6..742b174 100644 --- a/src/sync/trial_decrypt.rs +++ b/src/sync/trial_decrypt.rs @@ -59,9 +59,9 @@ pub struct CompactOutputBytes { impl From<&CompactSaplingOutput> for CompactOutputBytes { fn from(co: &CompactSaplingOutput) -> Self { CompactOutputBytes { - epk: co.epk.clone().try_into().unwrap(), - cmx: co.cmu.clone().try_into().unwrap(), - ciphertext: co.ciphertext.clone().try_into().unwrap(), + epk: if co.epk.is_empty() { [0u8; 32] } else { co.epk.clone().try_into().unwrap() } , + cmx: co.cmu.clone().try_into().unwrap(), // cannot be filtered out + ciphertext: if co.ciphertext.is_empty() { [0u8; 52] } else { co.ciphertext.clone().try_into().unwrap() }, } } } diff --git a/src/transaction.rs b/src/transaction.rs index 36846a8..b3985a1 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -295,7 +295,6 @@ mod tests { Network::MainNetwork.hrp_sapling_extended_full_viewing_key(), &fvk, ) - .unwrap() .unwrap(); let tx_info = decode_transaction( &Network::MainNetwork,