diff --git a/Cargo.toml b/Cargo.toml index 28612cd..8dc82b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,6 +70,7 @@ chacha20poly1305 = "0.9.0" base64 = "^0.13" base58check = "0.1.0" raptorq = "1.7.0" +sysinfo = "0.25" ledger-apdu = { version = "0.9.0", optional = true } hmac = { version = "0.12.1", optional = true } @@ -87,6 +88,8 @@ node-bindgen = { version = "4.0", optional = true } rustacuda = { version = "0.1.3", optional = true } rustacuda_core = { version = "0.1.2", optional = true } +# ash-warp = { path = "../../../ash-warp", optional = true } + [features] ledger = ["ledger-apdu", "hmac", "ed25519-bip32", "ledger-transport-hid"] ledger_sapling = ["ledger"] @@ -94,6 +97,7 @@ dart_ffi = ["allo-isolate", "once_cell", "android_logger"] rpc = ["rocket", "dotenv"] nodejs = ["node-bindgen"] cuda = ["rustacuda", "rustacuda_core"] +# vulkan = ["ash-warp"] # librustzcash synced to 35023ed8ca2fb1061e78fd740b640d4eefcc5edd diff --git a/src/builder.rs b/src/builder.rs index 1e252fb..c798496 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1,4 +1,6 @@ use crate::commitment::{CTree, Witness}; +#[cfg(feature = "cuda")] +use crate::cuda::CUDA_PROCESSOR; use crate::hash::{pedersen_hash, pedersen_hash_inner}; use ff::PrimeField; use group::Curve; @@ -6,8 +8,6 @@ use jubjub::{AffinePoint, ExtendedPoint}; use rayon::prelude::IntoParallelIterator; use rayon::prelude::*; use zcash_primitives::sapling::Node; -#[cfg(feature = "cuda")] -use crate::cuda::CUDA_PROCESSOR; #[inline(always)] fn batch_node_combine1(depth: usize, left: &Node, right: &Node) -> ExtendedPoint { @@ -167,7 +167,12 @@ impl CTreeBuilder { } } -fn combine_level_soft(commitments: &mut [Node], offset: Option, n: usize, depth: usize) -> usize { +fn combine_level_soft( + commitments: &mut [Node], + offset: Option, + n: usize, + depth: usize, +) -> usize { assert_eq!(n % 2, 0); let nn = n / 2; @@ -206,9 +211,16 @@ fn combine_level_soft(commitments: &mut [Node], offset: Option, n: usize, } #[cfg(feature = "cuda")] -fn combine_level_cuda(commitments: &mut [Node], offset: Option, n: usize, depth: usize) -> usize { +fn combine_level_cuda( + commitments: &mut [Node], + offset: Option, + n: usize, + depth: usize, +) -> usize { assert_eq!(n % 2, 0); - if n == 0 { return 0; } + if n == 0 { + return 0; + } let mut hasher = CUDA_PROCESSOR.lock().unwrap(); if let Some(hasher) = hasher.as_mut() { @@ -221,8 +233,7 @@ fn combine_level_cuda(commitments: &mut [Node], offset: Option, n: usize, commitments[i] = Node::new(new_hashes[i]); } nn - } - else { + } else { combine_level_soft(commitments, offset, n, depth) } } diff --git a/src/chain.rs b/src/chain.rs index 19e1930..07b38c7 100644 --- a/src/chain.rs +++ b/src/chain.rs @@ -1,18 +1,22 @@ use crate::advance_tree; use crate::commitment::{CTree, Witness}; +#[cfg(feature = "cuda")] +use crate::cuda::CUDA_PROCESSOR; use crate::db::AccountViewKey; use crate::lw_rpc::compact_tx_streamer_client::CompactTxStreamerClient; use crate::lw_rpc::*; -use crate::scan::{Blocks, MAX_OUTPUTS_PER_CHUNK}; +use crate::scan::Blocks; use ff::PrimeField; use futures::{future, FutureExt}; use log::info; use rayon::prelude::*; use std::collections::HashMap; +use std::convert::TryInto; use std::marker::PhantomData; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use std::time::Instant; +use sysinfo::{System, SystemExt}; use thiserror::Error; use tokio::sync::mpsc::Sender; use tokio::time::timeout; @@ -26,8 +30,6 @@ use zcash_primitives::sapling::note_encryption::SaplingDomain; use zcash_primitives::sapling::{Node, Note, PaymentAddress}; use zcash_primitives::transaction::components::sapling::CompactOutputDescription; use zcash_primitives::zip32::ExtendedFullViewingKey; -#[cfg(feature = "cuda")] -use crate::cuda::CUDA_PROCESSOR; pub async fn get_latest_height( client: &mut CompactTxStreamerClient, @@ -97,15 +99,53 @@ pub enum ChainError { Busy, } +fn get_mem_per_output() -> usize { + if cfg!(feature = "cuda") { + 1000 + } else { + 5 + } +} + +#[cfg(feature = "cuda")] +fn get_available_memory() -> anyhow::Result { + let cuda = CUDA_PROCESSOR.lock().unwrap(); + if let Some(cuda) = cuda.as_ref() { + cuda.total_memory() + } else { + get_system_available_memory() + } +} + +#[cfg(not(feature = "cuda"))] +fn get_available_memory() -> anyhow::Result { + get_system_available_memory() +} + +fn get_system_available_memory() -> anyhow::Result { + let mut sys = System::new(); + sys.refresh_memory(); + let mem_available = sys.available_memory() as usize; + Ok(mem_available) +} + +const MAX_OUTPUTS_PER_CHUNKS: usize = 200_000; + /* download [start_height+1, end_height] inclusive */ +#[allow(unused_variables)] pub async fn download_chain( client: &mut CompactTxStreamerClient, + n_ivks: usize, start_height: u32, end_height: u32, mut prev_hash: Option<[u8; 32]>, blocks_tx: Sender, cancel: &'static AtomicBool, ) -> anyhow::Result<()> { + let outputs_per_chunk = get_available_memory()? / get_mem_per_output(); + let outputs_per_chunk = outputs_per_chunk.min(MAX_OUTPUTS_PER_CHUNKS); + log::info!("Outputs per chunk = {}", outputs_per_chunk); + let mut output_count = 0; let mut cbs: Vec = Vec::new(); let range = BlockRange { @@ -143,7 +183,7 @@ pub async fn download_chain( } let block_output_count: usize = block.vtx.iter().map(|tx| tx.outputs.len()).sum(); - if output_count + block_output_count > MAX_OUTPUTS_PER_CHUNK { + if output_count + block_output_count > outputs_per_chunk { // output let out = cbs; cbs = Vec::new(); @@ -197,14 +237,10 @@ pub struct DecryptedNote { } pub fn to_output_description(co: &CompactSaplingOutput) -> CompactOutputDescription { - let mut cmu = [0u8; 32]; - cmu.copy_from_slice(&co.cmu); + let cmu: [u8; 32] = co.cmu.clone().try_into().unwrap(); let cmu = bls12_381::Scalar::from_repr(cmu).unwrap(); - let mut epk = [0u8; 32]; - epk.copy_from_slice(&co.epk); - // let epk = jubjub::ExtendedPoint::from_bytes(&epk).unwrap(); - let mut enc_ciphertext = [0u8; 52]; - enc_ciphertext.copy_from_slice(&co.ciphertext); + let epk: [u8; 32] = co.epk.clone().try_into().unwrap(); + let enc_ciphertext: [u8; 52] = co.ciphertext.clone().try_into().unwrap(); CompactOutputDescription { ephemeral_key: EphemeralKeyBytes::from(epk), @@ -260,11 +296,9 @@ impl<'a, N: Parameters> ShieldedOutput, COMPACT_NOTE_SIZE> fn ephemeral_key(&self) -> EphemeralKeyBytes { self.epk.clone() } - fn cmstar_bytes(&self) -> as Domain>::ExtractedCommitmentBytes { self.cmu } - fn enc_ciphertext(&self) -> &[u8; COMPACT_NOTE_SIZE] { &self.ciphertext } @@ -302,27 +336,6 @@ fn decrypt_notes<'a, N: Parameters>( ); outputs.push((domain, output)); - // let od = to_output_description(co); - // - // for (&account, vk) in vks.iter() { - // if let Some((note, pa)) = - // try_sapling_compact_note_decryption(network, height, &vk.ivk, &od) - // { - // notes.push(DecryptedNote { - // account, - // ivk: vk.fvk.clone(), - // note, - // pa, - // viewonly: vk.viewonly, - // position_in_block: count_outputs as usize, - // height: block.height as u32, - // tx_index, - // txid: vtx.hash.clone(), - // output_index, - // }); - // } - // } - count_outputs += 1; } } else { @@ -331,10 +344,6 @@ fn decrypt_notes<'a, N: Parameters>( } } - if outputs.len() >= MAX_OUTPUTS_PER_CHUNK { - log::warn!("outputs overflow {}", outputs.len()); - } - let start = Instant::now(); let notes_decrypted = try_compact_note_decryption::, AccountOutput>(&vvks, &outputs); @@ -406,13 +415,12 @@ impl DecryptNode { network: &Network, blocks: &'a [CompactBlock], ) -> Vec> { + if blocks.is_empty() { return vec![]; } let mut cuda_processor = CUDA_PROCESSOR.lock().unwrap(); if let Some(cuda_processor) = cuda_processor.as_mut() { - let mut decrypted_blocks = vec![]; - for (account, vk) in self.vks.iter() { - decrypted_blocks.extend(cuda_processor.trial_decrypt(network, *account, &vk.fvk, blocks).unwrap()); - } - return decrypted_blocks; + return cuda_processor + .trial_decrypt(network, self.vks.iter(), blocks) + .unwrap(); } self.decrypt_blocks_soft(network, blocks) } @@ -469,9 +477,6 @@ fn calculate_tree_state_v1( } } } - // let mut bb: Vec = vec![]; - // tree_state.write(&mut bb).unwrap(); - // hex::encode(bb) witnesses } @@ -524,6 +529,7 @@ pub fn calculate_tree_state_v2(cbs: &[CompactBlock], blocks: &[DecryptedBlock]) } pub async fn connect_lightwalletd(url: &str) -> anyhow::Result> { + log::info!("LWD URL: {}", url); let mut channel = tonic::transport::Channel::from_shared(url.to_owned())?; if url.starts_with("https") { let pem = include_bytes!("ca.pem"); @@ -557,120 +563,3 @@ pub async fn get_best_server(servers: &[String]) -> Option { .max_by_key(|(_, h)| *h) .map(|x| x.0) } - -// pub async fn sync( -// network: &Network, -// vks: HashMap, -// ld_url: &str, -// ) -> anyhow::Result<()> { -// let decrypter = DecryptNode::new(vks); -// let mut client = connect_lightwalletd(ld_url).await?; -// let start_height: u32 = network -// .activation_height(NetworkUpgrade::Sapling) -// .unwrap() -// .into(); -// let end_height = get_latest_height(&mut client).await?; -// -// let start = Instant::now(); -// let cbs = download_chain(&mut client, start_height, end_height, None).await?; -// eprintln!("Download chain: {} ms", start.elapsed().as_millis()); -// -// let start = Instant::now(); -// let blocks = decrypter.decrypt_blocks(network, &cbs); -// eprintln!("Decrypt Notes: {} ms", start.elapsed().as_millis()); -// let batch_decrypt_elapsed: usize = blocks.iter().map(|b| b.elapsed).sum(); -// eprintln!(" Batch Decrypt: {} ms", batch_decrypt_elapsed); -// -// let start = Instant::now(); -// let witnesses = calculate_tree_state_v2(&cbs, &blocks); -// eprintln!("Tree State & Witnesses: {} ms", start.elapsed().as_millis()); -// -// eprintln!("# Witnesses {}", witnesses.len()); -// for w in witnesses.iter() { -// let mut bb: Vec = vec![]; -// w.write(&mut bb).unwrap(); -// log::info!("{}", hex::encode(&bb)); -// } -// -// Ok(()) -// } - -#[cfg(test)] -mod tests { - #[allow(unused_imports)] - use crate::chain::{ - calculate_tree_state_v1, calculate_tree_state_v2, download_chain, get_latest_height, - get_tree_state, DecryptNode, - }; - use crate::db::AccountViewKey; - use crate::lw_rpc::compact_tx_streamer_client::CompactTxStreamerClient; - use crate::LWD_URL; - - use std::collections::HashMap; - use std::time::Instant; - use zcash_client_backend::encoding::decode_extended_full_viewing_key; - use zcash_primitives::consensus::{Network, NetworkUpgrade, Parameters}; - - const NETWORK: &Network = &Network::MainNetwork; - - #[tokio::test] - async fn test_get_latest_height() -> anyhow::Result<()> { - let mut client = CompactTxStreamerClient::connect(LWD_URL).await?; - let height = get_latest_height(&mut client).await?; - assert!(height > 1288000); - Ok(()) - } - - #[tokio::test] - async fn test_download_chain() -> anyhow::Result<()> { - dotenv::dotenv().unwrap(); - let fvk = dotenv::var("FVK").unwrap(); - - let mut fvks: HashMap = HashMap::new(); - let fvk = - decode_extended_full_viewing_key(NETWORK.hrp_sapling_extended_full_viewing_key(), &fvk) - .unwrap() - .unwrap(); - fvks.insert(1, AccountViewKey::from_fvk(&fvk)); - let decrypter = DecryptNode::new(fvks); - let mut client = CompactTxStreamerClient::connect(LWD_URL).await?; - let start_height: u32 = NETWORK - .activation_height(NetworkUpgrade::Sapling) - .unwrap() - .into(); - let end_height = get_latest_height(&mut client).await?; - - let start = Instant::now(); - let cbs = download_chain(&mut client, start_height, end_height, None).await?; - eprintln!("Download chain: {} ms", start.elapsed().as_millis()); - - let start = Instant::now(); - let blocks = decrypter.decrypt_blocks(&Network::MainNetwork, &cbs); - eprintln!("Decrypt Notes: {} ms", start.elapsed().as_millis()); - - // no need to calculate tree before the first note if we can - // get it from the server - // disabled because I want to see the performance of a complete scan - - // let first_block = blocks.iter().find(|b| !b.notes.is_empty()).unwrap(); - // let height = first_block.height - 1; - // let tree_state = get_tree_state(&mut client, height).await; - // let tree_state = hex::decode(tree_state).unwrap(); - // let tree_state = CommitmentTree::::read(&*tree_state).unwrap(); - - // let witnesses = calculate_tree_state(&cbs, &blocks, 0, tree_state); - - let start = Instant::now(); - let witnesses = calculate_tree_state_v2(&cbs, &blocks); - eprintln!("Tree State & Witnesses: {} ms", start.elapsed().as_millis()); - - eprintln!("# Witnesses {}", witnesses.len()); - for w in witnesses.iter() { - let mut bb: Vec = vec![]; - w.write(&mut bb).unwrap(); - log::info!("{}", hex::encode(&bb)); - } - - Ok(()) - } -} diff --git a/src/cuda.rs b/src/cuda.rs index 18cef6b..3eae87a 100644 --- a/src/cuda.rs +++ b/src/cuda.rs @@ -1,9 +1,10 @@ -use std::sync::Mutex; use lazy_static::lazy_static; +use std::sync::Mutex; mod processor; use processor::CudaProcessor; lazy_static! { - pub static ref CUDA_PROCESSOR: Mutex> = Mutex::new(CudaProcessor::new().ok()); + pub static ref CUDA_PROCESSOR: Mutex> = + Mutex::new(CudaProcessor::new().ok()); } diff --git a/src/cuda/processor.rs b/src/cuda/processor.rs index a839b9c..2f0569d 100644 --- a/src/cuda/processor.rs +++ b/src/cuda/processor.rs @@ -1,20 +1,19 @@ -use std::convert::TryInto; -use std::ffi::CString; -use jubjub::{Fq, ExtendedPoint}; -use rustacuda::launch; -use rustacuda::prelude::*; -use group::Curve; -use ff::BatchInverter; +use crate::chain::{DecryptedBlock, DecryptedNote, Nf}; +use crate::lw_rpc::CompactBlock; use crate::{Hash, GENERATORS_EXP}; use anyhow::Result; +use ff::BatchInverter; +use jubjub::Fq; use rustacuda::context::CurrentContext; -use crate::lw_rpc::CompactBlock; +use rustacuda::launch; +use rustacuda::prelude::*; +use std::convert::TryInto; +use std::ffi::CString; use zcash_note_encryption::Domain; use zcash_primitives::consensus::{BlockHeight, Network}; use zcash_primitives::sapling::note_encryption::SaplingDomain; -use zcash_primitives::sapling::SaplingIvk; use zcash_primitives::zip32::ExtendedFullViewingKey; -use crate::chain::{DecryptedBlock, DecryptedNote, Nf}; +use crate::db::AccountViewKey; const THREADS_PER_BLOCK: usize = 256usize; const BUFFER_SIZE: usize = 128usize; @@ -36,6 +35,7 @@ impl CudaProcessor { if let Err(ref err) = r { log::info!("CUDA Initialization Error {:?}", err); } + log::info!("CUDA Initialized"); r } @@ -43,7 +43,8 @@ impl CudaProcessor { rustacuda::init(rustacuda::CudaFlags::empty())?; let device = Device::get_device(0)?; - let context = Context::create_and_push(ContextFlags::MAP_HOST | ContextFlags::SCHED_AUTO, device)?; + let context = + Context::create_and_push(ContextFlags::MAP_HOST | ContextFlags::SCHED_AUTO, device)?; let ptx = CString::new(include_str!("../cuda/hash.ptx"))?; let hash_module = Module::load_from_string(&ptx)?; @@ -60,7 +61,7 @@ impl CudaProcessor { GENERATORS_EXP[i].copy_to_slice(&mut gens[i * 128..(i + 1) * 128]); } - let mut generators = DeviceBuffer::from_slice(&gens)?; + let generators = DeviceBuffer::from_slice(&gens)?; Ok(CudaProcessor { device, context, @@ -71,9 +72,14 @@ impl CudaProcessor { }) } + pub fn total_memory(&self) -> Result { + let mem = self.device.total_memory()?.saturating_sub(500_000_000); // leave 500 MB of GPU for other stuff; + log::info!("Cuda memory {}", mem); + Ok(mem) + } + pub fn batch_hash_cuda(&mut self, depth: u8, data: &[Hash]) -> Result> { CurrentContext::set_current(&self.context)?; - log::info!("cuda - pedersen hash"); let n = data.len() / 2; let mut in_data = DeviceBuffer::from_slice(data)?; @@ -121,103 +127,126 @@ impl CudaProcessor { Ok(out) } - pub fn trial_decrypt<'a>(&mut self, network: &Network, - account: u32, - fvk: &ExtendedFullViewingKey, - blocks: &'a [CompactBlock]) -> Result>> { - CurrentContext::set_current(&self.context)?; + pub fn trial_decrypt<'a, 'b, FVKIter: Iterator>( + &mut self, + network: &Network, + fvks: FVKIter, + blocks: &'a [CompactBlock], + ) -> Result>> { + CurrentContext::set_current(&self.context).unwrap(); - log::info!("cuda - trial decrypt"); - let ivk = fvk.fvk.vk.ivk(); - - let mut ivk_fr = ivk.0; - ivk_fr = ivk_fr.double(); // multiply by cofactor - ivk_fr = ivk_fr.double(); - ivk_fr = ivk_fr.double(); - - let n = blocks.iter().map(|b| - b.vtx.iter().map(|tx| tx.outputs.len()).sum::() - ).sum::(); + let n = blocks + .iter() + .map(|b| b.vtx.iter().map(|tx| tx.outputs.len()).sum::()) + .sum::(); let block_count = (n + THREADS_PER_BLOCK - 1) / THREADS_PER_BLOCK; + if n == 0 { return Ok(vec![]); } - let mut data_buffer = vec![0u8; n*BUFFER_SIZE]; - let mut i = 0; - for b in blocks.iter() { - for tx in b.vtx.iter() { - for co in tx.outputs.iter() { - data_buffer[i*BUFFER_SIZE..i*BUFFER_SIZE+32].copy_from_slice(&co.epk); - data_buffer[i*BUFFER_SIZE+64..i*BUFFER_SIZE+116].copy_from_slice(&co.ciphertext); - i += 1; - } - } - } - - let mut ivk_device_buffer = DeviceBuffer::from_slice(&ivk_fr.to_bytes())?; - let mut data_device_buffer = DeviceBuffer::from_slice(&data_buffer)?; - - unsafe { - // Launch the kernel again using the `function` form: - let function_name = CString::new("trial_decrypt_full").unwrap(); - let trial_decrypt_full = self.trial_decrypt_module.get_function(&function_name).unwrap(); - - let stream = &self.stream; - let result = launch!(trial_decrypt_full<<<(block_count as u32, 1, 1), (THREADS_PER_BLOCK as u32, 1, 1), 0, stream>>>( - n, - ivk_device_buffer.as_device_ptr(), - data_device_buffer.as_device_ptr() - )); - result?; - } - self.stream.synchronize()?; - - data_device_buffer.copy_to(&mut data_buffer)?; - - let mut i = 0; let mut decrypted_blocks = vec![]; + // collect nullifiers for b in blocks.iter() { - let mut decrypted_notes = vec![]; let mut spends = vec![]; - let mut count_outputs: u32 = 0; - let domain = SaplingDomain::for_height(*network, BlockHeight::from_u32(b.height as u32)); - for (tx_index, tx) in b.vtx.iter().enumerate() { + let mut count_outputs = 0; + for tx in b.vtx.iter() { for cs in tx.spends.iter() { let mut nf = [0u8; 32]; nf.copy_from_slice(&cs.nf); spends.push(Nf(nf)); } - for (output_index, co) in tx.outputs.iter().enumerate() { - let plaintext = &data_buffer[i*BUFFER_SIZE+64..i*BUFFER_SIZE+116]; - if let Some((note, pa)) = domain.parse_note_plaintext_without_memo_ivk(&ivk, plaintext) { - let cmu = note.cmu().to_bytes(); - if &cmu == co.cmu.as_slice() { - decrypted_notes.push(DecryptedNote { - account, - ivk: fvk.clone(), - note, - pa, - position_in_block: count_outputs as usize, - viewonly: false, - height: b.height as u32, - txid: tx.hash.clone(), - tx_index, - output_index - }); - } - } - count_outputs += 1; - i += 1; - } + count_outputs += tx.outputs.len(); } decrypted_blocks.push(DecryptedBlock { height: b.height as u32, - notes: decrypted_notes, - count_outputs, + notes: vec![], + count_outputs: count_outputs as u32, spends, compact_block: b, - elapsed: 0 + elapsed: 0, // TODO }); } + let mut data_buffer = vec![0u8; n * BUFFER_SIZE]; + let mut i = 0; + for b in blocks.iter() { + for tx in b.vtx.iter() { + for co in tx.outputs.iter() { + data_buffer[i * BUFFER_SIZE..i * BUFFER_SIZE + 32].copy_from_slice(&co.epk); + data_buffer[i * BUFFER_SIZE + 64..i * BUFFER_SIZE + 116] + .copy_from_slice(&co.ciphertext); + i += 1; + } + } + } + + let mut data_device_buffer = DeviceBuffer::from_slice(&data_buffer).unwrap(); + + for (account, avk) in fvks { + let fvk = &avk.fvk; + let ivk = fvk.fvk.vk.ivk(); + + let mut ivk_fr = ivk.0; + ivk_fr = ivk_fr.double(); // multiply by cofactor + ivk_fr = ivk_fr.double(); + ivk_fr = ivk_fr.double(); + let mut ivk_device_buffer = DeviceBuffer::from_slice(&ivk_fr.to_bytes()).unwrap(); + + // decrypt all the blocks for the current account + unsafe { + // Launch the kernel again using the `function` form: + let function_name = CString::new("trial_decrypt_full").unwrap(); + let trial_decrypt_full = self + .trial_decrypt_module + .get_function(&function_name) + .unwrap(); + + let stream = &self.stream; + let result = launch!(trial_decrypt_full<<<(block_count as u32, 1, 1), (THREADS_PER_BLOCK as u32, 1, 1), 0, stream>>>( + n, + ivk_device_buffer.as_device_ptr(), + data_device_buffer.as_device_ptr() + )); + result.unwrap(); + } + self.stream.synchronize().unwrap(); + + data_device_buffer.copy_to(&mut data_buffer).unwrap(); + + // merge the decrypted blocks + let mut i = 0; + for db in decrypted_blocks.iter_mut() { + let b = db.compact_block; + let mut decrypted_notes = vec![]; + let mut position_in_block = 0; + let domain = + SaplingDomain::for_height(*network, BlockHeight::from_u32(b.height as u32)); + for (tx_index, tx) in b.vtx.iter().enumerate() { + for (output_index, co) in tx.outputs.iter().enumerate() { + let plaintext = &data_buffer[i * BUFFER_SIZE + 64..i * BUFFER_SIZE + 116]; + if let Some((note, pa)) = domain.parse_note_plaintext_without_memo_ivk(&ivk, plaintext) { + let cmu = note.cmu().to_bytes(); + if &cmu == co.cmu.as_slice() { + decrypted_notes.push(DecryptedNote { + account: *account, + ivk: fvk.clone(), + note, + pa, + position_in_block, + viewonly: false, + height: b.height as u32, + txid: tx.hash.clone(), + tx_index, + output_index, + }); + } + } + i += 1; + position_in_block += 1; + } + } + db.notes.extend(decrypted_notes); + } + } + Ok(decrypted_blocks) } } diff --git a/src/lib.rs b/src/lib.rs index 39660cb..daeedce 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -73,7 +73,7 @@ pub use crate::coinconfig::{ pub use crate::commitment::{CTree, Witness}; pub use crate::db::{AccountRec, DbAdapter, TxRec}; pub use crate::fountain::{put_drop, FountainCodes, RaptorQDrops}; -pub use crate::hash::{Hash, pedersen_hash, GENERATORS_EXP}; +pub use crate::hash::{pedersen_hash, Hash, GENERATORS_EXP}; pub use crate::key::{generate_random_enc_key, KeyHelpers}; pub use crate::lw_rpc::compact_tx_streamer_client::CompactTxStreamerClient; pub use crate::lw_rpc::*; diff --git a/src/main/rpc.rs b/src/main/rpc.rs index 152ff60..4da5583 100644 --- a/src/main/rpc.rs +++ b/src/main/rpc.rs @@ -136,8 +136,15 @@ pub fn list_accounts() -> Result>, Error> { #[post("/sync?")] pub async fn sync(offset: Option) -> Result<(), Error> { let c = CoinConfig::get_active(); - warp_api_ffi::api::sync::coin_sync(c.coin, true, offset.unwrap_or(0), 50, |_| {}, &SYNC_CANCELED) - .await?; + warp_api_ffi::api::sync::coin_sync( + c.coin, + true, + offset.unwrap_or(0), + 50, + |_| {}, + &SYNC_CANCELED, + ) + .await?; Ok(()) } diff --git a/src/scan.rs b/src/scan.rs index f7a0b26..4958932 100644 --- a/src/scan.rs +++ b/src/scan.rs @@ -41,8 +41,6 @@ pub struct TxIdHeight { index: u32, } -pub const MAX_OUTPUTS_PER_CHUNK: usize = 200_000; - pub async fn sync_async( coin_type: CoinType, _chunk_size: u32, @@ -74,6 +72,7 @@ pub async fn sync_async( if start_height >= end_height { return Ok(()); } + let n_ivks = vks.len(); let decrypter = DecryptNode::new(vks, max_cost); @@ -85,6 +84,7 @@ pub async fn sync_async( log::info!("download_scheduler"); download_chain( &mut client, + n_ivks, start_height, end_height, prev_hash,