Sapling decryption and warp sync

This commit is contained in:
Hanh 2022-10-27 18:10:51 +08:00
parent 3539c45771
commit 81836eae13
10 changed files with 186 additions and 48 deletions

View File

@ -128,8 +128,9 @@ pub unsafe extern "C" fn get_lwd_url(coin: u8) -> *mut c_char {
#[no_mangle]
pub unsafe extern "C" fn reset_app() {
let res = || {
crate::api::account::reset_db(0)?;
crate::api::account::reset_db(1)?;
for i in 0..MAX_COINS {
crate::api::account::reset_db(i)?;
}
Ok(())
};
log_error(res())

View File

@ -9,6 +9,7 @@ use tokio::sync::Mutex;
use tonic::transport::Channel;
use tonic::Request;
use zcash_primitives::sapling::Note;
use crate::sync::CTree;
const DEFAULT_CHUNK_SIZE: u32 = 100_000;

View File

@ -4,12 +4,12 @@ use crate::lw_rpc::*;
use crate::scan::Blocks;
use ff::PrimeField;
use futures::{future, FutureExt};
use log::info;
use rand::prelude::SliceRandom;
use rand::rngs::OsRng;
use rayon::prelude::*;
use std::collections::HashMap;
use std::convert::TryInto;
use std::future::Future;
use std::marker::PhantomData;
use std::path::Path;
use std::sync::Mutex;
@ -140,13 +140,12 @@ const MAX_OUTPUTS_PER_CHUNKS: usize = 200_000;
#[allow(unused_variables)]
pub async fn download_chain(
client: &mut CompactTxStreamerClient<Channel>,
n_ivks: usize,
start_height: u32,
end_height: u32,
mut prev_hash: Option<[u8; 32]>,
max_cost: u32,
blocks_tx: Sender<Blocks>,
cancel: &'static Mutex<bool>,
handler: Sender<Blocks>,
) -> anyhow::Result<()> {
let outputs_per_chunk = get_available_memory()? / get_mem_per_output();
let outputs_per_chunk = outputs_per_chunk.min(MAX_OUTPUTS_PER_CHUNKS);
@ -210,7 +209,10 @@ pub async fn download_chain(
// output
let out = cbs;
cbs = Vec::new();
blocks_tx.send(Blocks(out, total_block_size)).await.unwrap();
let blocks = Blocks(out, total_block_size);
if !blocks.0.is_empty() {
let _ = handler.send(blocks).await;
}
output_count = 0;
total_block_size = 0;
}
@ -218,7 +220,10 @@ pub async fn download_chain(
cbs.push(block);
output_count += block_output_count;
}
let _ = blocks_tx.send(Blocks(cbs, total_block_size)).await;
let blocks = Blocks(cbs, total_block_size);
if !blocks.0.is_empty() {
let _ = handler.send(blocks).await;
}
Ok(())
}

View File

@ -47,6 +47,7 @@ pub struct ReceivedNote {
pub spent: Option<u32>,
}
#[derive(Clone)]
pub struct ReceivedNoteShort {
pub id: u32,
pub account: u32,
@ -425,6 +426,19 @@ impl DbAdapter {
Ok(())
}
pub fn store_tree(height: u32, hash: &[u8], tree: &CTree, db_tx: &Connection, shielded_pool: &str) -> anyhow::Result<()> {
let mut bb: Vec<u8> = vec![];
tree.write(&mut bb)?;
db_tx.execute(&format!("INSERT INTO blocks(height, hash, {pool}_tree, timestamp) VALUES (?1,?2,?3,0) ON CONFLICT DO UPDATE
SET {pool}_tree = excluded.{pool}_tree", pool = shielded_pool), params![height, hash, &bb])?;
Ok(())
}
pub fn store_block_timestamp(&self, height: u32, timestamp: u32) -> anyhow::Result<()> {
self.connection.execute("UPDATE blocks SET timestamp = ?1 WHERE height = ?2", params![timestamp, height])?;
Ok(())
}
pub fn store_tx_metadata(&self, id_tx: u32, tx_info: &TransactionInfo) -> anyhow::Result<()> {
self.connection.execute(
"UPDATE transactions SET address = ?1, memo = ?2 WHERE id_tx = ?3",
@ -500,7 +514,8 @@ impl DbAdapter {
}
pub fn get_tree(&self) -> anyhow::Result<(TreeCheckpoint, TreeCheckpoint)> {
self.get_tree_by_name("sapling"); // TODO: pack in TreeCheckpoint
self.get_tree_by_name("sapling")?; // TODO: pack in TreeCheckpoint
todo!()
}
pub fn get_tree_by_name(&self, shielded_pool: &str) -> anyhow::Result<TreeCheckpoint> {
@ -582,14 +597,14 @@ impl DbAdapter {
pub fn get_unspent_nullifiers(
&self,
account: u32,
) -> anyhow::Result<Vec<ReceivedNoteShort>> {
let sql = "SELECT id_note, nf, value FROM received_notes WHERE account = ?1 AND (spent IS NULL OR spent = 0)";
let sql = "SELECT id_note, account, nf, value FROM received_notes WHERE spent IS NULL OR spent = 0";
let mut statement = self.connection.prepare(sql)?;
let nfs_res = statement.query_map(params![account], |row| {
let nfs_res = statement.query_map(params![], |row| {
let id: u32 = row.get(0)?;
let nf: Vec<u8> = row.get(1)?;
let value: i64 = row.get(2)?;
let account: u32 = row.get(1)?;
let nf: Vec<u8> = row.get(2)?;
let value: i64 = row.get(3)?;
let nf: [u8; 32] = nf.try_into().unwrap();
let nf = Nf(nf);
Ok(ReceivedNoteShort {
@ -1271,8 +1286,8 @@ pub struct AccountData {
#[cfg(test)]
mod tests {
use crate::db::{DbAdapter, DEFAULT_DB_PATH, ReceivedNote};
use crate::commitment::{CTree, Witness};
use zcash_params::coin::CoinType;
use crate::sync::{CTree, Witness};
#[test]
fn test_db() {
@ -1303,10 +1318,10 @@ mod tests {
let witness = Witness {
position: 10,
id_note: 0,
note: None,
tree: CTree::new(),
filled: vec![],
cursor: CTree::new(),
cmx: [0u8; 32]
};
DbAdapter::store_witnesses(&db_tx, &witness, 1000, 1).unwrap();
db_tx.commit().unwrap();

View File

@ -121,7 +121,7 @@ mod tests {
use std::convert::TryInto;
use rand::RngCore;
use rand::rngs::OsRng;
use crate::pedersen_hash;
use crate::hash::pedersen_hash;
use crate::sapling::hash::hash_combine;
#[test]

View File

@ -4,10 +4,7 @@ use serde::Serialize;
use std::cmp::Ordering;
use crate::transaction::retrieve_tx_info;
use crate::{
connect_lightwalletd, CompactBlock, CompactSaplingOutput,
CompactTx
};
use crate::{connect_lightwalletd, CompactBlock, CompactSaplingOutput, CompactTx, DbAdapterBuilder, chain};
use crate::chain::{DecryptNode, download_chain};
use ff::PrimeField;
@ -25,6 +22,9 @@ use zcash_params::coin::{get_coin_chain, CoinType};
use zcash_primitives::consensus::{Network, Parameters};
use zcash_primitives::sapling::{Node, Note};
use zcash_primitives::sapling::note_encryption::SaplingDomain;
use crate::sapling::{DecryptedSaplingNote, SaplingDecrypter, SaplingHasher, SaplingViewKey};
use crate::sync::{CTree, Synchronizer, WarpProcessor};
pub struct Blocks(pub Vec<CompactBlock>, pub usize);
@ -58,7 +58,86 @@ pub struct TxIdHeight {
index: u32,
}
pub async fn sync_async(
type SaplingSynchronizer = Synchronizer<Network, SaplingDomain<Network>, SaplingViewKey, DecryptedSaplingNote,
SaplingDecrypter<Network>, SaplingHasher>;
pub async fn sync_async<'a>(
coin_type: CoinType,
_chunk_size: u32,
get_tx: bool,
db_path: &'a str,
target_height_offset: u32,
max_cost: u32,
progress_callback: AMProgressCallback,
cancel: &'static std::sync::Mutex<bool>,
ld_url: &'a str,
) -> anyhow::Result<()> {
let ld_url = ld_url.to_owned();
let db_path = db_path.to_owned();
let network = {
let chain = get_coin_chain(coin_type);
*chain.network()
};
let mut client = connect_lightwalletd(&ld_url).await?;
let (start_height, prev_hash, sapling_vks) = {
let db = DbAdapter::new(coin_type, &db_path)?;
let height = db.get_db_height()?;
let hash = db.get_db_hash(height)?;
let vks = db.get_fvks()?;
let sapling_vks: Vec<_> = vks.iter().map(|(&account, ak)| {
SaplingViewKey {
account,
fvk: ak.fvk.clone(),
ivk: ak.ivk.clone()
}
}).collect();
(height, hash, sapling_vks)
};
let end_height = get_latest_height(&mut client).await?;
let end_height = (end_height - target_height_offset).max(start_height);
if start_height >= end_height {
return Ok(());
}
let (blocks_tx, mut blocks_rx) = mpsc::channel::<Blocks>(1);
tokio::spawn(async move {
download_chain(&mut client, start_height, end_height, prev_hash, max_cost, cancel, blocks_tx).await?;
Ok::<_, anyhow::Error>(())
});
let db_builder = DbAdapterBuilder { coin_type, db_path: db_path.clone() };
while let Some(blocks) = blocks_rx.recv().await {
let first_block = blocks.0.first().unwrap(); // cannot be empty because blocks are not
log::info!("Height: {}", first_block.height);
let last_block = blocks.0.last().unwrap();
let last_height = last_block.height as u32;
let last_timestamp = last_block.time;
let decrypter = SaplingDecrypter::new(network);
let warper = WarpProcessor::new(SaplingHasher::default());
let mut sapling_synchronizer = SaplingSynchronizer::new(
decrypter,
warper,
sapling_vks.clone(),
db_builder.clone(),
"sapling".to_string(),
);
sapling_synchronizer.initialize()?;
sapling_synchronizer.process(blocks.0)?;
// TODO - Orchard
let db = db_builder.build()?;
db.store_block_timestamp(last_height, last_timestamp)?;
}
Ok(())
}
fn sync_async_old(
coin_type: CoinType,
_chunk_size: u32,
get_tx: bool,
@ -69,6 +148,7 @@ pub async fn sync_async(
cancel: &'static std::sync::Mutex<bool>,
ld_url: &str,
) -> anyhow::Result<()> {
/*
let ld_url = ld_url.to_owned();
let db_path = db_path.to_string();
let network = {
@ -84,6 +164,13 @@ pub async fn sync_async(
let vks = db.get_fvks()?;
(height, hash, vks)
};
let saplingvks: Vec<_> = vks.iter().map(|(&account, vk)| {
SaplingViewKey {
account,
fvk: vk.fvk.clone(),
ivk: vk.ivk.clone(),
}
}).collect();
let end_height = get_latest_height(&mut client).await?;
let end_height = (end_height - target_height_offset).max(start_height);
@ -92,8 +179,6 @@ pub async fn sync_async(
}
let n_ivks = vks.len();
let decrypter = DecryptNode::new(vks);
let (decryptor_tx, mut decryptor_rx) = mpsc::channel::<Blocks>(1);
let (processor_tx, mut processor_rx) = mpsc::channel::<(Vec<DecryptedBlock>, usize)>(1);
@ -135,14 +220,12 @@ pub async fn sync_async(
let processor = tokio::spawn(async move {
let mut db = DbAdapter::new(coin_type, &db_path2)?;
let mut nfs = db.get_nullifiers()?;
while let Some((dec_blocks, blocks_size)) = processor_rx.recv().await {
if dec_blocks.is_empty() {
continue;
}
progress.downloaded += blocks_size;
let (mut sapling_checkpoint, mut orchard_checkpoint) = db.get_tree()?;
/* TODO
- Change to WarpProcessors & Trial Decryptors - sapling & orchard
- Feed block into WP sapling
@ -152,19 +235,26 @@ pub async fn sync_async(
*/
let mut bp = BlockProcessor::new(&tree, &witnesses);
let mut absolute_position_at_block_start = tree.get_position();
log::info!("start processing - {}", dec_blocks[0].height);
log::info!("Time {:?}", chrono::offset::Local::now());
let start = Instant::now();
let mut new_ids_tx: HashMap<u32, TxIdHeight> = HashMap::new();
let mut witnesses: Vec<Witness> = vec![];
let decrypter = DecryptNode::new(vks);
let mut sapling_synchronizer = SaplingSynchronizer::new(
decrypter,
warper,
saplingvks.clone(),
DbAdapterBuilder { coin_type, db_path: db_path.clone() },
"sapling".to_string(),
);
sapling_synchronizer.initialize()?;
{
// db tx scope
let db_tx = db.begin_transaction()?;
/*
let outputs = dec_blocks
.iter()
.map(|db| db.count_outputs as usize)
@ -271,12 +361,15 @@ pub async fn sync_async(
}
absolute_position_at_block_start += b.count_outputs as usize;
}
*/
log::info!("Dec end : {}", start.elapsed().as_millis());
db_tx.commit()?;
}
/*
let start = Instant::now();
let mut nodes: Vec<Node> = vec![];
for block in dec_blocks.iter() {
@ -318,22 +411,24 @@ pub async fn sync_async(
tree = new_tree;
witnesses = new_witnesses;
*/
if let Some(dec_block) = dec_blocks.last() {
{
let block = &dec_block.compact_block;
let mut db_transaction = db.begin_transaction()?;
let height = block.height as u32;
for w in witnesses.iter() {
DbAdapter::store_witnesses(&db_transaction, w, height, w.id_note)?;
}
DbAdapter::store_block(
&mut db_transaction,
height,
&block.hash,
block.time,
&tree,
None,
)?;
// for w in witnesses.iter() {
// DbAdapter::store_witnesses(&db_transaction, w, height, w.id_note)?;
// }
// DbAdapter::store_block(
// &mut db_transaction,
// height,
// &block.hash,
// block.time,
// &tree,
// None,
// )?;
db_transaction.commit()?;
// db_transaction is dropped here
}
@ -379,6 +474,8 @@ pub async fn sync_async(
log::info!("Sync completed");
*/
Ok(())
}

View File

@ -37,14 +37,31 @@ impl <N: Parameters + Sync,
DN: DecryptedNote<D, VK> + Sync,
TD: TrialDecrypter<N, D, VK, DN> + Sync,
H: Hasher> Synchronizer<N, D, VK, DN, TD, H> {
pub fn new(decrypter: TD, warper: WarpProcessor<H>, vks: Vec<VK>, db: DbAdapterBuilder, shielded_pool: String) -> Self {
Synchronizer {
decrypter,
warper,
vks,
db,
shielded_pool,
note_position: 0,
nullifiers: HashMap::default(),
tree: CTree::new(),
witnesses: vec![],
_phantom: Default::default()
}
}
pub fn initialize(&mut self) -> Result<()> {
let db = self.db.build()?;
let TreeCheckpoint { tree, witnesses } = db.get_tree_by_name(&self.shielded_pool)?;
self.tree = tree;
self.witnesses = witnesses;
self.note_position = self.tree.get_position();
for vk in self.vks.iter() {
let account = vk.account();
let nfs = db.get_unspent_nullifiers(account)?;
let nfs = db.get_unspent_nullifiers()?;
for rn in nfs.into_iter() {
self.nullifiers.insert(rn.nf.clone(), rn);
}
@ -93,6 +110,7 @@ impl <N: Parameters + Sync,
// Detect spends and collect note commitments
let mut new_cmx = vec![];
let mut height = 0;
let mut hash = [0u8; 32];
for b in blocks.iter() {
for (tx_index, tx) in b.vtx.iter().enumerate() {
for sp in self.decrypter.spends(tx).iter() {
@ -106,6 +124,7 @@ impl <N: Parameters + Sync,
new_cmx.extend(self.decrypter.outputs(tx).into_iter().map(|cob| cob.cmx));
}
height = b.height as u32;
hash.copy_from_slice(&b.hash);
}
// Run blocks through warp sync
@ -116,6 +135,7 @@ impl <N: Parameters + Sync,
for w in updated_witnesses.iter() {
DbAdapter::store_witness(w, height, w.id_note, &db_tx, &self.shielded_pool)?;
}
DbAdapter::store_tree(height, &hash, &updated_tree, &db_tx, &self.shielded_pool)?;
self.tree = updated_tree;
self.witnesses = updated_witnesses;

View File

@ -415,6 +415,7 @@ struct WitnessBuilder<H: Hasher> {
impl <H: Hasher> WitnessBuilder<H> {
fn new(tree_builder: &CTreeBuilder<H>, prev_witness: &Witness, count: usize) -> Self {
let position = prev_witness.position;
// log::info!("Witness::new - {} {},{}", position, tree_builder.start, tree_builder.start + count);
let inside = position >= tree_builder.start && position < tree_builder.start + count;
WitnessBuilder {
witness: prev_witness.clone(),
@ -434,7 +435,6 @@ impl <H: Hasher> Builder for WitnessBuilder<H> {
let depth = context.depth;
let tree = &mut self.witness.tree;
if self.inside {
let rp = self.p - context.adjusted_start(&offset);
if depth == 0 {

View File

@ -59,9 +59,9 @@ pub struct CompactOutputBytes {
impl From<&CompactSaplingOutput> for CompactOutputBytes {
fn from(co: &CompactSaplingOutput) -> Self {
CompactOutputBytes {
epk: co.epk.clone().try_into().unwrap(),
cmx: co.cmu.clone().try_into().unwrap(),
ciphertext: co.ciphertext.clone().try_into().unwrap(),
epk: if co.epk.is_empty() { [0u8; 32] } else { co.epk.clone().try_into().unwrap() } ,
cmx: co.cmu.clone().try_into().unwrap(), // cannot be filtered out
ciphertext: if co.ciphertext.is_empty() { [0u8; 52] } else { co.ciphertext.clone().try_into().unwrap() },
}
}
}

View File

@ -295,7 +295,6 @@ mod tests {
Network::MainNetwork.hrp_sapling_extended_full_viewing_key(),
&fvk,
)
.unwrap()
.unwrap();
let tx_info = decode_transaction(
&Network::MainNetwork,