This commit is contained in:
Hanh 2021-06-26 17:52:03 +08:00
parent 915769c47e
commit 3432ea15bd
15 changed files with 970 additions and 297 deletions

View File

@ -18,6 +18,7 @@ path = "src/main/warp_cli.rs"
dotenv = "0.15.0"
env_logger = "0.8.4"
anyhow = "1.0.40"
thiserror = "1.0.25"
log = "0.4.14"
flexi_logger = {version="0.17.1", features = ["compress"]}
serde = {version = "1.0.126", features = ["derive"]}

View File

@ -1,6 +1,6 @@
use criterion::{criterion_group, criterion_main, Criterion};
use tokio::runtime::Runtime;
use sync::{scan_all, NETWORK};
use tokio::runtime::Runtime;
use zcash_client_backend::encoding::decode_extended_full_viewing_key;
use zcash_primitives::consensus::Parameters;

View File

@ -20,6 +20,7 @@ struct CTreeBuilder {
total_len: usize,
depth: usize,
offset: Option<Node>,
first_block: bool,
}
impl Builder<CTree, ()> for CTreeBuilder {
@ -37,8 +38,7 @@ impl Builder<CTree, ()> for CTreeBuilder {
m = commitments.len();
};
let n =
if self.total_len > 0 {
let n = if self.total_len > 0 {
if self.depth == 0 {
if m % 2 == 0 {
self.next_tree.left = Some(*Self::get(commitments, m - 2, &offset));
@ -59,8 +59,9 @@ impl Builder<CTree, ()> for CTreeBuilder {
m - 1
}
}
}
else { 0 };
} else {
0
};
assert_eq!(n % 2, 0);
self.offset = offset;
@ -106,7 +107,7 @@ impl Builder<CTree, ()> for CTreeBuilder {
}
impl CTreeBuilder {
fn new(prev_tree: CTree, len: usize) -> CTreeBuilder {
fn new(prev_tree: CTree, len: usize, first_block: bool) -> CTreeBuilder {
let start = prev_tree.get_position();
CTreeBuilder {
left: prev_tree.left,
@ -117,6 +118,7 @@ impl CTreeBuilder {
total_len: len,
depth: 0,
offset: None,
first_block,
}
}
@ -142,7 +144,7 @@ impl CTreeBuilder {
Self::get_opt(commitments, index, offset).unwrap()
}
fn adjusted_start(&self, prev: &Option<Node>, _depth: usize) -> usize {
fn adjusted_start(&self, prev: &Option<Node>) -> usize {
if prev.is_some() {
self.start - 1
} else {
@ -193,10 +195,9 @@ impl Builder<Witness, CTreeBuilder> for WitnessBuilder {
let depth = context.depth;
let tree = &mut self.witness.tree;
let right = if depth != 0 { context.right } else { None };
if self.inside {
let rp = self.p - context.adjusted_start(&offset, depth);
let rp = self.p - context.adjusted_start(&offset);
if depth == 0 {
if self.p % 2 == 1 {
tree.left = Some(*CTreeBuilder::get(commitments, rp - 1, &offset));
@ -221,10 +222,15 @@ impl Builder<Witness, CTreeBuilder> for WitnessBuilder {
// for c in commitments.iter() {
// println!("{}", hex::encode(c.repr));
// }
let right = if depth != 0 && !context.first_block {
context.right
} else {
None
};
let p1 = self.p + 1;
let has_p1 = p1 >= context.adjusted_start(&right, depth) && p1 < context.start + commitments.len();
let has_p1 = p1 >= context.adjusted_start(&right) && p1 < context.start + commitments.len();
if has_p1 {
let p1 = CTreeBuilder::get(commitments, p1 - context.adjusted_start(&right, depth), &right);
let p1 = CTreeBuilder::get(commitments, p1 - context.adjusted_start(&right), &right);
if depth == 0 {
if tree.right.is_none() {
self.witness.filled.push(*p1);
@ -289,11 +295,12 @@ impl Builder<Witness, CTreeBuilder> for WitnessBuilder {
#[allow(dead_code)]
pub fn advance_tree(
prev_tree: CTree,
prev_tree: &CTree,
prev_witnesses: &[Witness],
mut commitments: &mut [Node],
first_block: bool,
) -> (CTree, Vec<Witness>) {
let mut builder = CTreeBuilder::new(prev_tree, commitments.len());
let mut builder = CTreeBuilder::new(prev_tree.clone(), commitments.len(), first_block);
let mut witness_builders: Vec<_> = prev_witnesses
.iter()
.map(|witness| WitnessBuilder::new(&builder, witness.clone(), commitments.len()))
@ -319,43 +326,115 @@ pub fn advance_tree(
(tree, witnesses)
}
pub struct BlockProcessor {
prev_tree: CTree,
prev_witnesses: Vec<Witness>,
first_block: bool,
}
impl BlockProcessor {
pub fn new(prev_tree: &CTree, prev_witnesses: &[Witness]) -> BlockProcessor {
BlockProcessor {
prev_tree: prev_tree.clone(),
prev_witnesses: prev_witnesses.to_vec(),
first_block: true,
}
}
pub fn add_nodes(&mut self, nodes: &mut [Node], new_witnesses: &[Witness]) {
self.prev_witnesses.extend_from_slice(new_witnesses);
let (t, ws) = advance_tree(
&self.prev_tree,
&self.prev_witnesses,
nodes,
self.first_block,
);
self.prev_tree = t;
self.prev_witnesses = ws;
}
pub fn finalize(self) -> (CTree, Vec<Witness>) {
let (t, ws) = advance_tree(&self.prev_tree, &self.prev_witnesses, &mut [], false);
(t, ws)
}
}
#[cfg(test)]
#[allow(unused_imports)]
mod tests {
use crate::builder::advance_tree;
use crate::builder::{advance_tree, BlockProcessor};
use crate::chain::DecryptedNote;
use crate::commitment::{CTree, Witness};
use crate::print::{print_ctree, print_tree, print_witness, print_witness2};
use zcash_primitives::merkle_tree::{CommitmentTree, IncrementalWitness};
use zcash_primitives::sapling::Node;
use crate::chain::DecryptedNote;
use crate::print::{print_witness, print_witness2, print_tree, print_ctree};
#[test]
fn test_advance_tree_equal_blocks() {
for num_nodes in 1..=10 {
for num_chunks in 1..=10 {
test_advance_tree_helper(num_nodes, num_chunks, 100.0, None);
}
}
}
#[test]
fn test_advance_tree_unequal_blocks() {
for num_nodes1 in 1..=30 {
for num_nodes2 in 1..=30 {
println!("TESTING {} {}", num_nodes1, num_nodes2);
let (t, ws) = test_advance_tree_helper(num_nodes1, 1, 100.0, None);
test_advance_tree_helper(num_nodes2, 1, 100.0, Some((t, ws)));
}
}
}
#[test]
fn test_advance_tree() {
for num_nodes in 1..=10 {
for num_chunks in 1..=10 {
test_advance_tree_helper(num_nodes, num_chunks, 100.0);
}
}
test_advance_tree_helper(100, 50, 1.0, None);
test_advance_tree_helper(100, 50, 1.0);
// test_advance_tree_helper(2, 10, 100.0);
// test_advance_tree_helper(1, 40, 100.0);
// test_advance_tree_helper(10, 2, 100.0);
}
fn test_advance_tree_helper(num_nodes: usize, num_chunks: usize, witness_percent: f64) {
fn test_advance_tree_helper(
num_nodes: usize,
num_chunks: usize,
witness_percent: f64,
initial: Option<(CTree, Vec<Witness>)>,
) -> (CTree, Vec<Witness>) {
let witness_freq = (100.0 / witness_percent) as usize;
let mut tree1: CommitmentTree<Node> = CommitmentTree::empty();
let mut tree2 = CTree::new();
let mut ws: Vec<IncrementalWitness<Node>> = vec![];
let mut ws2: Vec<Witness> = vec![];
if let Some((t0, ws0)) = initial {
tree2 = t0;
ws2 = ws0;
let mut bb: Vec<u8> = vec![];
tree2.write(&mut bb).unwrap();
tree1 = CommitmentTree::<Node>::read(&*bb).unwrap();
for w in ws2.iter() {
bb = vec![];
w.write(&mut bb).unwrap();
let w1 = IncrementalWitness::<Node>::read(&*bb).unwrap();
ws.push(w1);
}
}
let p0 = tree2.get_position();
let mut bp = BlockProcessor::new(&tree2, &ws2);
for i in 0..num_chunks {
println!("{}", i);
let mut nodes: Vec<_> = vec![];
let mut ws2: Vec<Witness> = vec![];
for j in 0..num_nodes {
let mut bb = [0u8; 32];
let v = i * num_nodes + j;
let v = i * num_nodes + j + p0;
bb[0..8].copy_from_slice(&v.to_be_bytes());
let node = Node::new(bb);
tree1.append(node).unwrap();
@ -371,15 +450,10 @@ mod tests {
nodes.push(node);
}
let (new_tree, new_witnesses) = advance_tree(tree2, &ws2, &mut nodes);
tree2 = new_tree;
ws2 = new_witnesses;
bp.add_nodes(&mut nodes, &ws2);
}
// Push an empty block
// It will calculate the tail of the tree
// This step is required at the end of a series of chunks
let (new_tree, new_witnesses) = advance_tree(tree2, &ws2, &mut []);
let (new_tree, new_witnesses) = bp.finalize();
tree2 = new_tree;
ws2 = new_witnesses;
@ -411,18 +485,20 @@ mod tests {
if bb1.as_slice() != bb2.as_slice() {
failed_index = Some(i);
println!("FAILED AT {}", i);
if let Some(ref c) = w1.cursor {
print_tree(c);
}
else { println!("NONE"); }
println!("GOOD");
print_witness(&w1);
if let Some(ref c) = w1.cursor {
print_tree(c);
} else {
println!("NONE");
}
println!("BAD");
print_witness2(&w2);
}
}
assert!(equal && failed_index.is_none());
}
(tree2, ws2)
}
}

View File

@ -1,24 +1,25 @@
use crate::commitment::{CTree, Witness};
use crate::lw_rpc::compact_tx_streamer_client::CompactTxStreamerClient;
use crate::lw_rpc::*;
use crate::{NETWORK, advance_tree};
use crate::{advance_tree, NETWORK};
use ff::PrimeField;
use group::GroupEncoding;
use log::info;
use rayon::prelude::*;
use tonic::transport::{Channel, Certificate, ClientTlsConfig};
use std::time::Instant;
use tonic::transport::{Certificate, Channel, ClientTlsConfig};
use tonic::Request;
use zcash_primitives::consensus::{BlockHeight, Parameters, NetworkUpgrade};
use thiserror::Error;
use zcash_client_backend::encoding::decode_extended_full_viewing_key;
use zcash_primitives::consensus::{BlockHeight, NetworkUpgrade, Parameters};
use zcash_primitives::merkle_tree::{CommitmentTree, IncrementalWitness};
use zcash_primitives::sapling::note_encryption::try_sapling_compact_note_decryption;
use zcash_primitives::sapling::{Node, Note, PaymentAddress};
use zcash_primitives::transaction::components::sapling::CompactOutputDescription;
use crate::commitment::{CTree, Witness};
use std::time::Instant;
use log::info;
use zcash_client_backend::encoding::decode_extended_full_viewing_key;
use zcash_primitives::zip32::ExtendedFullViewingKey;
const MAX_CHUNK: u32 = 50000;
// pub const LWD_URL: &str = "https://mainnet.lightwalletd.com:9067";
// pub const LWD_URL: &str = "https://testnet.lightwalletd.com:9067";
// pub const LWD_URL: &str = "http://lwd.hanh.me:9067";
// pub const LWD_URL: &str = "https://lwdv3.zecwallet.co";
pub const LWD_URL: &str = "http://127.0.0.1:9067";
@ -32,11 +33,20 @@ pub async fn get_latest_height(
Ok(block_id.height as u32)
}
#[derive(Error, Debug)]
pub enum ChainError {
#[error("Blockchain reorganization")]
Reorg,
#[error("Synchronizer busy")]
Busy,
}
/* download [start_height+1, end_height] inclusive */
pub async fn download_chain(
client: &mut CompactTxStreamerClient<Channel>,
start_height: u32,
end_height: u32,
mut prev_hash: Option<[u8; 32]>,
) -> anyhow::Result<Vec<CompactBlock>> {
let mut cbs: Vec<CompactBlock> = Vec::new();
let mut s = start_height + 1;
@ -57,6 +67,12 @@ pub async fn download_chain(
.await?
.into_inner();
while let Some(block) = block_stream.message().await? {
if prev_hash.is_some() && block.prev_hash.as_slice() != prev_hash.unwrap() {
anyhow::bail!(ChainError::Reorg);
}
let mut ph = [0u8; 32];
ph.copy_from_slice(&block.hash);
prev_hash = Some(ph);
cbs.push(block);
}
s = e + 1;
@ -68,14 +84,15 @@ pub struct DecryptNode {
fvks: Vec<ExtendedFullViewingKey>,
}
#[derive(Eq, Hash, PartialEq)]
#[derive(Eq, Hash, PartialEq, Copy, Clone)]
pub struct Nf(pub [u8; 32]);
pub struct DecryptedBlock {
pub struct DecryptedBlock<'a> {
pub height: u32,
pub notes: Vec<DecryptedNote>,
pub count_outputs: u32,
pub spends: Vec<Nf>,
pub compact_block: &'a CompactBlock,
}
#[derive(Clone)]
@ -91,7 +108,25 @@ pub struct DecryptedNote {
pub output_index: usize,
}
fn decrypt_notes(block: &CompactBlock, fvks: &[ExtendedFullViewingKey]) -> DecryptedBlock {
pub fn to_output_description(co: &CompactOutput) -> CompactOutputDescription {
let mut cmu = [0u8; 32];
cmu.copy_from_slice(&co.cmu);
let cmu = bls12_381::Scalar::from_repr(cmu).unwrap();
let mut epk = [0u8; 32];
epk.copy_from_slice(&co.epk);
let epk = jubjub::ExtendedPoint::from_bytes(&epk).unwrap();
let od = CompactOutputDescription {
epk,
cmu,
enc_ciphertext: co.ciphertext.to_vec(),
};
od
}
fn decrypt_notes<'a>(
block: &'a CompactBlock,
fvks: &[ExtendedFullViewingKey],
) -> DecryptedBlock<'a> {
let height = BlockHeight::from_u32(block.height as u32);
let mut count_outputs = 0u32;
let mut spends: Vec<Nf> = vec![];
@ -104,19 +139,9 @@ fn decrypt_notes(block: &CompactBlock, fvks: &[ExtendedFullViewingKey]) -> Decry
}
for (output_index, co) in vtx.outputs.iter().enumerate() {
let mut cmu = [0u8; 32];
cmu.copy_from_slice(&co.cmu);
let cmu = bls12_381::Scalar::from_repr(cmu).unwrap();
let mut epk = [0u8; 32];
epk.copy_from_slice(&co.epk);
let epk = jubjub::ExtendedPoint::from_bytes(&epk).unwrap();
let od = CompactOutputDescription {
epk,
cmu,
enc_ciphertext: co.ciphertext.to_vec(),
};
for fvk in fvks.iter() {
let ivk = &fvk.fvk.vk.ivk();
let od = to_output_description(co);
if let Some((note, pa)) =
try_sapling_compact_note_decryption(&NETWORK, height, ivk, &od)
{
@ -140,6 +165,7 @@ fn decrypt_notes(block: &CompactBlock, fvks: &[ExtendedFullViewingKey]) -> Decry
spends,
notes,
count_outputs,
compact_block: block,
}
}
@ -148,7 +174,7 @@ impl DecryptNode {
DecryptNode { fvks }
}
pub fn decrypt_blocks(&self, blocks: &[CompactBlock]) -> Vec<DecryptedBlock> {
pub fn decrypt_blocks<'a>(&self, blocks: &'a [CompactBlock]) -> Vec<DecryptedBlock<'a>> {
let mut decrypted_blocks: Vec<DecryptedBlock> = blocks
.par_iter()
.map(|b| decrypt_notes(b, &self.fvks))
@ -172,12 +198,19 @@ async fn get_tree_state(client: &mut CompactTxStreamerClient<Channel>, height: u
rep.tree
}
pub async fn send_transaction(client: &mut CompactTxStreamerClient<Channel>, raw_tx: &[u8], height: u32) -> anyhow::Result<String> {
pub async fn send_transaction(
client: &mut CompactTxStreamerClient<Channel>,
raw_tx: &[u8],
height: u32,
) -> anyhow::Result<String> {
let raw_tx = RawTransaction {
data: raw_tx.to_vec(),
height: height as u64
height: height as u64,
};
let rep = client.send_transaction(Request::new(raw_tx)).await?.into_inner();
let rep = client
.send_transaction(Request::new(raw_tx))
.await?
.into_inner();
Ok(rep.error_message)
}
@ -257,10 +290,17 @@ pub fn calculate_tree_state_v2(cbs: &[CompactBlock], blocks: &[DecryptedBlock])
}
}
}
info!("Build CMU list: {} ms - {} nodes", start.elapsed().as_millis(), nodes.len());
info!(
"Build CMU list: {} ms - {} nodes",
start.elapsed().as_millis(),
nodes.len()
);
let witnesses: Vec<_> = positions.iter().map(|p| Witness::new(*p, 0, None)).collect();
let (_, new_witnesses) = advance_tree(CTree::new(), &witnesses, &mut nodes);
let witnesses: Vec<_> = positions
.iter()
.map(|p| Witness::new(*p, 0, None))
.collect();
let (_, new_witnesses) = advance_tree(&CTree::new(), &witnesses, &mut nodes, true);
info!("Tree State & Witnesses: {} ms", start.elapsed().as_millis());
new_witnesses
}
@ -291,7 +331,7 @@ pub async fn sync(ivk: &str) -> anyhow::Result<()> {
let end_height = get_latest_height(&mut client).await?;
let start = Instant::now();
let cbs = download_chain(&mut client, start_height, end_height).await?;
let cbs = download_chain(&mut client, start_height, end_height, None).await?;
eprintln!("Download chain: {} ms", start.elapsed().as_millis());
let start = Instant::now();
@ -314,15 +354,18 @@ pub async fn sync(ivk: &str) -> anyhow::Result<()> {
#[cfg(test)]
mod tests {
use crate::chain::LWD_URL;
#[allow(unused_imports)]
use crate::chain::{download_chain, get_latest_height, get_tree_state, calculate_tree_state_v1, calculate_tree_state_v2, DecryptNode};
use crate::chain::{
calculate_tree_state_v1, calculate_tree_state_v2, download_chain, get_latest_height,
get_tree_state, DecryptNode,
};
use crate::lw_rpc::compact_tx_streamer_client::CompactTxStreamerClient;
use crate::NETWORK;
use dotenv;
use std::time::Instant;
use zcash_client_backend::encoding::decode_extended_full_viewing_key;
use zcash_primitives::consensus::{NetworkUpgrade, Parameters};
use crate::chain::LWD_URL;
#[tokio::test]
async fn test_get_latest_height() -> anyhow::Result<()> {
@ -350,7 +393,7 @@ mod tests {
let end_height = get_latest_height(&mut client).await?;
let start = Instant::now();
let cbs = download_chain(&mut client, start_height, end_height).await?;
let cbs = download_chain(&mut client, start_height, end_height, None).await?;
eprintln!("Download chain: {} ms", start.elapsed().as_millis());
let start = Instant::now();

View File

@ -1,9 +1,9 @@
use crate::chain::DecryptedNote;
use byteorder::WriteBytesExt;
use std::io::{Write, Read};
use zcash_primitives::merkle_tree::{Hashable, CommitmentTree};
use std::io::{Read, Write};
use zcash_primitives::merkle_tree::{CommitmentTree, Hashable};
use zcash_primitives::sapling::Node;
use zcash_primitives::serialize::{Optional, Vector};
use crate::chain::DecryptedNote;
/*
Same behavior and structure as CommitmentTree<Node> from librustzcash
@ -177,7 +177,8 @@ impl CTree {
pub fn clone_trimmed(&self, depth: usize) -> CTree {
let mut tree = self.clone();
tree.parents.truncate(depth);
if let Some(None) = tree.parents.last() { // Remove trailing None
if let Some(None) = tree.parents.last() {
// Remove trailing None
tree.parents.truncate(depth - 1);
}
tree

320
src/db.rs
View File

@ -1,11 +1,14 @@
use rusqlite::{Connection, params, OptionalExtension, NO_PARAMS};
use crate::{Witness, CTree};
use zcash_primitives::sapling::{Note, Diversifier, Rseed, Node};
use zcash_primitives::zip32::ExtendedFullViewingKey;
use zcash_primitives::merkle_tree::IncrementalWitness;
use std::collections::HashMap;
use crate::chain::Nf;
use zcash_primitives::consensus::{Parameters, NetworkUpgrade};
use crate::{CTree, Witness};
use rusqlite::{params, Connection, OptionalExtension, NO_PARAMS};
use std::collections::HashMap;
use zcash_primitives::consensus::{NetworkUpgrade, Parameters};
use zcash_primitives::merkle_tree::IncrementalWitness;
use zcash_primitives::sapling::{Diversifier, Node, Note, Rseed};
use zcash_primitives::zip32::ExtendedFullViewingKey;
#[allow(dead_code)]
pub const DEFAULT_DB_PATH: &str = "zec.db";
pub struct DbAdapter {
connection: Connection,
@ -30,30 +33,41 @@ pub struct SpendableNote {
impl DbAdapter {
pub fn new(db_path: &str) -> anyhow::Result<DbAdapter> {
let connection = Connection::open(db_path)?;
Ok(DbAdapter {
connection,
})
Ok(DbAdapter { connection })
}
pub fn init_db(&self) -> anyhow::Result<()> {
self.connection.execute("CREATE TABLE IF NOT EXISTS accounts (
self.connection.execute(
"CREATE TABLE IF NOT EXISTS accounts (
id_account INTEGER PRIMARY KEY,
sk TEXT NOT NULL UNIQUE,
ivk TEXT NOT NULL,
address TEXT NOT NULL)", NO_PARAMS)?;
address TEXT NOT NULL)",
NO_PARAMS,
)?;
self.connection.execute("CREATE TABLE IF NOT EXISTS blocks (
self.connection.execute(
"CREATE TABLE IF NOT EXISTS blocks (
height INTEGER PRIMARY KEY,
hash BLOB NOT NULL,
sapling_tree BLOB NOT NULL)", NO_PARAMS)?;
timestamp INTEGER NOT NULL,
sapling_tree BLOB NOT NULL)",
NO_PARAMS,
)?;
self.connection.execute("CREATE TABLE IF NOT EXISTS transactions (
self.connection.execute(
"CREATE TABLE IF NOT EXISTS transactions (
id_tx INTEGER PRIMARY KEY,
txid BLOB NOT NULL UNIQUE,
height INTEGER,
tx_index INTEGER)", NO_PARAMS)?;
height INTEGER NOT NULL,
timestamp INTEGER NOT NULL,
value INTEGER NOT NULL,
tx_index INTEGER)",
NO_PARAMS,
)?;
self.connection.execute("CREATE TABLE IF NOT EXISTS received_notes (
self.connection.execute(
"CREATE TABLE IF NOT EXISTS received_notes (
id_note INTEGER PRIMARY KEY,
position INTEGER NOT NULL,
tx INTEGER NOT NULL,
@ -64,83 +78,185 @@ impl DbAdapter {
rcm BLOB NOT NULL,
nf BLOB NOT NULL UNIQUE,
spent INTEGER,
CONSTRAINT tx_output UNIQUE (tx, output_index))", NO_PARAMS)?;
CONSTRAINT tx_output UNIQUE (tx, output_index))",
NO_PARAMS,
)?;
self.connection.execute("CREATE TABLE IF NOT EXISTS sapling_witnesses (
self.connection.execute(
"CREATE TABLE IF NOT EXISTS sapling_witnesses (
id_witness INTEGER PRIMARY KEY,
note INTEGER NOT NULL,
height INTEGER NOT NULL,
witness BLOB NOT NULL,
CONSTRAINT witness_height UNIQUE (note, height))", NO_PARAMS)?;
CONSTRAINT witness_height UNIQUE (note, height))",
NO_PARAMS,
)?;
Ok(())
}
pub fn store_account(&self, sk: &str, ivk: &str, address: &str) -> anyhow::Result<()> {
self.connection.execute("INSERT INTO accounts(sk, ivk, address) VALUES (?1, ?2, ?3)
ON CONFLICT DO NOTHING", params![sk, ivk, address])?;
self.connection.execute(
"INSERT INTO accounts(sk, ivk, address) VALUES (?1, ?2, ?3)
ON CONFLICT DO NOTHING",
params![sk, ivk, address],
)?;
Ok(())
}
pub fn has_account(&self, account: u32) -> anyhow::Result<bool> {
let r: Option<i32> = self
.connection
.query_row(
"SELECT 1 FROM accounts WHERE id_account = ?1",
params![account],
|row| row.get(0),
)
.optional()?;
Ok(r.is_some())
}
pub fn trim_to_height(&mut self, height: u32) -> anyhow::Result<()> {
let tx = self.connection.transaction()?;
tx.execute("DELETE FROM blocks WHERE height >= ?1", params![height])?;
tx.execute("DELETE FROM sapling_witnesses WHERE height >= ?1", params![height])?;
tx.execute("DELETE FROM received_notes WHERE height >= ?1", params![height])?;
tx.execute("DELETE FROM transactions WHERE height >= ?1", params![height])?;
tx.execute(
"DELETE FROM sapling_witnesses WHERE height >= ?1",
params![height],
)?;
tx.execute(
"DELETE FROM received_notes WHERE height >= ?1",
params![height],
)?;
tx.execute(
"DELETE FROM transactions WHERE height >= ?1",
params![height],
)?;
tx.commit()?;
Ok(())
}
pub fn store_block(&self, height: u32, hash: &[u8], tree: &CTree) -> anyhow::Result<()> {
log::info!("+block");
pub fn store_block(
&self,
height: u32,
hash: &[u8],
timestamp: u32,
tree: &CTree,
) -> anyhow::Result<()> {
log::debug!("+block");
let mut bb: Vec<u8> = vec![];
tree.write(&mut bb)?;
self.connection.execute("INSERT INTO blocks(height, hash, sapling_tree)
VALUES (?1, ?2, ?3)
ON CONFLICT DO NOTHING", params![height, hash, &bb])?;
log::info!("-block");
self.connection.execute(
"INSERT INTO blocks(height, hash, timestamp, sapling_tree)
VALUES (?1, ?2, ?3, ?4)
ON CONFLICT DO NOTHING",
params![height, hash, timestamp, &bb],
)?;
log::debug!("-block");
Ok(())
}
pub fn store_transaction(&self, txid: &[u8], height: u32, tx_index: u32) -> anyhow::Result<u32> {
log::info!("+transaction");
self.connection.execute("INSERT INTO transactions(txid, height, tx_index)
VALUES (?1, ?2, ?3)
ON CONFLICT DO NOTHING", params![txid, height, tx_index])?;
let id_tx: u32 = self.connection.query_row("SELECT id_tx FROM transactions WHERE txid = ?1", params![txid], |row| row.get(0))?;
log::info!("-transaction {}", id_tx);
pub fn store_transaction(
&self,
txid: &[u8],
height: u32,
timestamp: u32,
tx_index: u32,
) -> anyhow::Result<u32> {
log::debug!("+transaction");
self.connection.execute(
"INSERT INTO transactions(txid, height, timestamp, tx_index, value)
VALUES (?1, ?2, ?3, ?4, 0)
ON CONFLICT DO NOTHING",
params![txid, height, timestamp, tx_index],
)?;
let id_tx: u32 = self.connection.query_row(
"SELECT id_tx FROM transactions WHERE txid = ?1",
params![txid],
|row| row.get(0),
)?;
log::debug!("-transaction {}", id_tx);
Ok(id_tx)
}
pub fn store_received_note(&self, note: &ReceivedNote, id_tx: u32, position: usize) -> anyhow::Result<u32> {
log::info!("+received_note {}", id_tx);
pub fn store_received_note(
&self,
note: &ReceivedNote,
id_tx: u32,
position: usize,
) -> anyhow::Result<u32> {
log::debug!("+received_note {}", id_tx);
self.connection.execute("INSERT INTO received_notes(tx, height, position, output_index, diversifier, value, rcm, nf, spent)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
ON CONFLICT DO NOTHING", params![id_tx, note.height, position as u32, note.output_index, note.diversifier, note.value as i64, note.rcm, note.nf, note.spent])?;
let id_note: u32 = self.connection.query_row("SELECT id_note FROM received_notes WHERE tx = ?1 AND output_index = ?2", params![id_tx, note.output_index], |row| row.get(0))?;
log::info!("-received_note");
let id_note: u32 = self.connection.query_row(
"SELECT id_note FROM received_notes WHERE tx = ?1 AND output_index = ?2",
params![id_tx, note.output_index],
|row| row.get(0),
)?;
log::debug!("-received_note");
Ok(id_note)
}
pub fn store_witnesses(&self, witness: &Witness, height: u32, id_note: u32) -> anyhow::Result<()> {
log::info!("+witnesses");
pub fn store_witnesses(
&self,
witness: &Witness,
height: u32,
id_note: u32,
) -> anyhow::Result<()> {
log::debug!("+witnesses");
let mut bb: Vec<u8> = vec![];
witness.write(&mut bb)?;
self.connection.execute("INSERT INTO sapling_witnesses(note, height, witness) VALUES (?1, ?2, ?3)
ON CONFLICT DO NOTHING", params![id_note, height, bb])?;
log::info!("-witnesses");
self.connection.execute(
"INSERT INTO sapling_witnesses(note, height, witness) VALUES (?1, ?2, ?3)
ON CONFLICT DO NOTHING",
params![id_note, height, bb],
)?;
log::debug!("-witnesses");
Ok(())
}
pub fn add_value(&self, id_tx: u32, value: i64) -> anyhow::Result<()> {
self.connection.execute(
"UPDATE transactions SET value = value + ?2 WHERE id_tx = ?1",
params![id_tx, value],
)?;
Ok(())
}
pub fn get_received_note_value(&self, nf: &Nf) -> anyhow::Result<i64> {
let value: i64 = self.connection.query_row(
"SELECT value FROM received_notes WHERE nf = ?1",
params![nf.0.to_vec()],
|row| row.get(0),
)?;
Ok(value)
}
pub fn get_balance(&self) -> anyhow::Result<u64> {
let balance: Option<i64> = self.connection.query_row("SELECT SUM(value) FROM received_notes WHERE spent IS NULL", NO_PARAMS, |row| row.get(0))?;
let balance: Option<i64> = self.connection.query_row(
"SELECT SUM(value) FROM received_notes WHERE spent IS NULL",
NO_PARAMS,
|row| row.get(0),
)?;
Ok(balance.unwrap_or(0) as u64)
}
pub fn get_spendable_balance(&self, anchor_height: u32) -> anyhow::Result<u64> {
let balance: Option<i64> = self.connection.query_row(
"SELECT SUM(value) FROM received_notes WHERE spent IS NULL AND height <= ?1",
params![anchor_height],
|row| row.get(0),
)?;
Ok(balance.unwrap_or(0) as u64)
}
pub fn get_last_sync_height(&self) -> anyhow::Result<Option<u32>> {
let height: Option<u32> = self.connection.query_row("SELECT MAX(height) FROM blocks", NO_PARAMS, |row| row.get(0))?;
let height: Option<u32> =
self.connection
.query_row("SELECT MAX(height) FROM blocks", NO_PARAMS, |row| {
row.get(0)
})?;
Ok(height)
}
@ -154,6 +270,15 @@ impl DbAdapter {
Ok(height)
}
pub fn get_db_hash(&self, height: u32) -> anyhow::Result<Option<[u8; 32]>> {
let hash: Option<Vec<u8>> = self.connection.query_row("SELECT hash FROM blocks WHERE height = ?1", params![height], |row| row.get(0)).optional()?;
Ok(hash.map(|h| {
let mut hash = [0u8; 32];
hash.copy_from_slice(&h);
hash
}))
}
pub fn get_tree(&self) -> anyhow::Result<(CTree, Vec<Witness>)> {
let res = self.connection.query_row(
"SELECT height, sapling_tree FROM blocks WHERE height = (SELECT MAX(height) FROM blocks)",
@ -166,7 +291,7 @@ impl DbAdapter {
Some((height, tree)) => {
let tree = CTree::read(&*tree)?;
let mut statement = self.connection.prepare(
"SELECT id_note, position, witness FROM sapling_witnesses w, received_notes n WHERE w.height = ?1 AND w.note = n.id_note")?;
"SELECT id_note, position, witness FROM sapling_witnesses w, received_notes n WHERE w.height = ?1 AND w.note = n.id_note AND n.spent IS NULL")?;
let ws = statement.query_map(params![height], |row| {
let id_note: u32 = row.get(0)?;
let position: u32 = row.get(1)?;
@ -178,14 +303,15 @@ impl DbAdapter {
witnesses.push(w?);
}
(tree, witnesses)
},
None => (CTree::new(), vec![])
}
None => (CTree::new(), vec![]),
})
}
pub fn get_nullifiers(&self) -> anyhow::Result<HashMap<Nf, u32>> {
let mut statement = self.connection.prepare(
"SELECT id_note, nf FROM received_notes WHERE spent = 0")?;
let mut statement = self
.connection
.prepare("SELECT id_note, nf FROM received_notes WHERE spent IS NULL")?;
let nfs_res = statement.query_map(NO_PARAMS, |row| {
let id_note: u32 = row.get(0)?;
let nf_vec: Vec<u8> = row.get(1)?;
@ -202,7 +328,29 @@ impl DbAdapter {
Ok(nfs)
}
pub fn get_spendable_notes(&self, anchor_height: u32, fvk: &ExtendedFullViewingKey) -> anyhow::Result<Vec<SpendableNote>> {
pub fn get_nullifier_amounts(&self) -> anyhow::Result<HashMap<Vec<u8>, u64>> {
let mut statement = self
.connection
.prepare("SELECT value, nf FROM received_notes WHERE spent IS NULL")?;
let nfs_res = statement.query_map(NO_PARAMS, |row| {
let amount: i64 = row.get(0)?;
let nf: Vec<u8> = row.get(1)?;
Ok((amount, nf))
})?;
let mut nfs: HashMap<Vec<u8>, u64> = HashMap::new();
for n in nfs_res {
let n = n?;
nfs.insert(n.1, n.0 as u64);
}
Ok(nfs)
}
pub fn get_spendable_notes(
&self,
anchor_height: u32,
fvk: &ExtendedFullViewingKey,
) -> anyhow::Result<Vec<SpendableNote>> {
let mut statement = self.connection.prepare(
"SELECT diversifier, value, rcm, witness FROM received_notes r, sapling_witnesses w WHERE spent IS NULL
AND w.height = (
@ -228,7 +376,7 @@ impl DbAdapter {
Ok(SpendableNote {
note,
diversifier,
witness
witness,
})
})?;
let mut spendable_notes: Vec<SpendableNote> = vec![];
@ -240,57 +388,71 @@ impl DbAdapter {
}
pub fn mark_spent(&self, id: u32, height: u32) -> anyhow::Result<()> {
log::info!("+mark_spent");
self.connection.execute("UPDATE received_notes SET spent = ?1 WHERE id_note = ?2", params![height, id])?;
log::info!("-mark_spent");
log::debug!("+mark_spent");
self.connection.execute(
"UPDATE received_notes SET spent = ?1 WHERE id_note = ?2",
params![height, id],
)?;
log::debug!("-mark_spent");
Ok(())
}
pub fn get_sk(&self, account: u32) -> anyhow::Result<String> {
log::info!("+get_sk");
let ivk = self.connection.query_row("SELECT sk FROM accounts WHERE id_account = ?1", params![account], |row | {
log::debug!("+get_sk");
let ivk = self.connection.query_row(
"SELECT sk FROM accounts WHERE id_account = ?1",
params![account],
|row| {
let ivk: String = row.get(0)?;
Ok(ivk)
})?;
log::info!("-get_sk");
},
)?;
log::debug!("-get_sk");
Ok(ivk)
}
pub fn get_ivk(&self, account: u32) -> anyhow::Result<String> {
log::info!("+get_ivk");
let ivk = self.connection.query_row("SELECT ivk FROM accounts WHERE id_account = ?1", params![account], |row | {
log::debug!("+get_ivk");
let ivk = self.connection.query_row(
"SELECT ivk FROM accounts WHERE id_account = ?1",
params![account],
|row| {
let ivk: String = row.get(0)?;
Ok(ivk)
})?;
log::info!("-get_ivk");
},
)?;
log::debug!("-get_ivk");
Ok(ivk)
}
}
#[cfg(test)]
mod tests {
use crate::db::{DbAdapter, ReceivedNote};
use crate::{Witness, CTree};
const DB_PATH: &str = "zec.db";
use crate::db::{DbAdapter, ReceivedNote, DEFAULT_DB_PATH};
use crate::{CTree, Witness};
#[test]
fn test_db() {
let mut db = DbAdapter::new(DB_PATH).unwrap();
let mut db = DbAdapter::new(DEFAULT_DB_PATH).unwrap();
db.init_db().unwrap();
db.trim_to_height(0).unwrap();
db.store_block(1, &[0u8; 32], &CTree::new()).unwrap();
let id_tx = db.store_transaction(&[0; 32], 1, 20).unwrap();
db.store_received_note(&ReceivedNote {
db.store_block(1, &[0u8; 32], 0, &CTree::new()).unwrap();
let id_tx = db.store_transaction(&[0; 32], 1, 0, 20).unwrap();
db.store_received_note(
&ReceivedNote {
height: 1,
output_index: 0,
diversifier: vec![],
value: 0,
rcm: vec![],
nf: vec![],
spent: None
}, id_tx, 5).unwrap();
spent: None,
},
id_tx,
5,
)
.unwrap();
let witness = Witness {
position: 10,
id_note: 0,
@ -304,7 +466,7 @@ mod tests {
#[test]
fn test_balance() {
let db = DbAdapter::new(DB_PATH).unwrap();
let db = DbAdapter::new(DEFAULT_DB_PATH).unwrap();
let balance = db.get_balance().unwrap();
println!("{}", balance);
}

View File

@ -1,9 +1,12 @@
use bip39::{Language, Mnemonic, Seed};
use zcash_primitives::zip32::{ExtendedSpendingKey, ExtendedFullViewingKey, ChildIndex};
use crate::NETWORK;
use zcash_primitives::consensus::Parameters;
use zcash_client_backend::encoding::{encode_extended_spending_key, encode_extended_full_viewing_key, encode_payment_address, decode_extended_spending_key, decode_extended_full_viewing_key};
use anyhow::anyhow;
use bip39::{Language, Mnemonic, Seed};
use zcash_client_backend::encoding::{
decode_extended_full_viewing_key, decode_extended_spending_key,
encode_extended_full_viewing_key, encode_extended_spending_key, encode_payment_address,
};
use zcash_primitives::consensus::Parameters;
use zcash_primitives::zip32::{ChildIndex, ExtendedFullViewingKey, ExtendedSpendingKey};
pub fn get_secret_key(seed: &str) -> anyhow::Result<String> {
let mnemonic = Mnemonic::from_phrase(&seed, Language::English)?;
@ -15,20 +18,28 @@ pub fn get_secret_key(seed: &str) -> anyhow::Result<String> {
ChildIndex::Hardened(0),
];
let extsk = ExtendedSpendingKey::from_path(&master, &path);
let spending_key = encode_extended_spending_key(NETWORK.hrp_sapling_extended_spending_key(), &extsk);
let spending_key =
encode_extended_spending_key(NETWORK.hrp_sapling_extended_spending_key(), &extsk);
Ok(spending_key)
}
pub fn get_viewing_key(secret_key: &str) -> anyhow::Result<String> {
let extsk = decode_extended_spending_key(NETWORK.hrp_sapling_extended_spending_key(), secret_key)?.ok_or(anyhow!("Invalid Secret Key"))?;
let extsk =
decode_extended_spending_key(NETWORK.hrp_sapling_extended_spending_key(), secret_key)?
.ok_or(anyhow!("Invalid Secret Key"))?;
let fvk = ExtendedFullViewingKey::from(&extsk);
let viewing_key = encode_extended_full_viewing_key(NETWORK.hrp_sapling_extended_full_viewing_key(), &fvk);
let viewing_key =
encode_extended_full_viewing_key(NETWORK.hrp_sapling_extended_full_viewing_key(), &fvk);
Ok(viewing_key)
}
pub fn get_address(viewing_key: &str) -> anyhow::Result<String> {
let fvk = decode_extended_full_viewing_key(NETWORK.hrp_sapling_extended_full_viewing_key(), &viewing_key)?.ok_or(anyhow!("Invalid Viewing Key"))?;
let fvk = decode_extended_full_viewing_key(
NETWORK.hrp_sapling_extended_full_viewing_key(),
&viewing_key,
)?
.ok_or(anyhow!("Invalid Viewing Key"))?;
let (_, payment_address) = fvk.default_address().unwrap();
let address = encode_payment_address(NETWORK.hrp_sapling_payment_address(), &payment_address);
Ok(address)

View File

@ -8,21 +8,23 @@ pub const NETWORK: Network = Network::TestNetwork;
mod builder;
mod chain;
mod commitment;
mod scan;
mod key;
mod db;
mod wallet;
mod key;
mod mempool;
mod print;
mod scan;
mod wallet;
pub use crate::builder::advance_tree;
pub use crate::chain::{
calculate_tree_state_v2, connect_lightwalletd, download_chain, get_latest_height, sync,
DecryptNode, LWD_URL,
DecryptNode, LWD_URL, ChainError
};
pub use crate::commitment::{CTree, Witness};
pub use crate::db::DbAdapter;
pub use crate::key::{get_address, get_secret_key, get_viewing_key};
pub use crate::lw_rpc::compact_tx_streamer_client::CompactTxStreamerClient;
pub use crate::lw_rpc::*;
pub use crate::scan::{scan_all, sync_async, latest_height};
pub use crate::key::{get_secret_key, get_address, get_viewing_key};
pub use crate::db::DbAdapter;
pub use crate::wallet::{Wallet, DEFAULT_ACCOUNT};
pub use crate::mempool::MemPool;
pub use crate::scan::{latest_height, scan_all, sync_async};
pub use crate::wallet::{Wallet, WalletBalance, DEFAULT_ACCOUNT};

View File

@ -32,6 +32,7 @@ fn test_increasing_notes() {
let mut _ws: Vec<IncrementalWitness<Node>> = vec![];
let mut ws2: Vec<Witness> = vec![];
let start = Instant::now();
let mut first_block = true;
for i in 0..NUM_CHUNKS {
eprintln!("{}, {}", i, start.elapsed().as_millis());
let mut nodes: Vec<_> = vec![];
@ -51,11 +52,14 @@ fn test_increasing_notes() {
}
nodes.push(node);
}
let (new_tree, new_witnesses) = advance_tree(tree2, &ws2, &mut nodes);
let (new_tree, new_witnesses) = advance_tree(&tree2, &ws2, &mut nodes, first_block);
first_block = false;
tree2 = new_tree;
ws2 = new_witnesses;
}
let (_, new_witnesses) = advance_tree(&tree2, &ws2, &mut [], false);
ws2 = new_witnesses;
println!("# witnesses = {}", ws2.len());
}
@ -78,6 +82,7 @@ fn test_increasing_gap(run_normal: bool, run_warp: bool) {
// Add our received notes
let mut pos = 0usize;
let mut nodes: Vec<_> = vec![];
let mut first_block = true;
for _ in 0..NUM_WITNESS {
let node = mk_node(pos);
if run_normal {
@ -94,7 +99,8 @@ fn test_increasing_gap(run_normal: bool, run_warp: bool) {
}
if run_warp {
let (new_tree, new_witnesses) = advance_tree(tree2, &ws2, &mut nodes);
let (new_tree, new_witnesses) = advance_tree(&tree2, &ws2, &mut nodes, first_block);
first_block = false;
tree2 = new_tree;
ws2 = new_witnesses;
}
@ -116,7 +122,7 @@ fn test_increasing_gap(run_normal: bool, run_warp: bool) {
}
if run_warp {
let (new_tree, new_witnesses) = advance_tree(tree2, &ws2, &mut nodes);
let (new_tree, new_witnesses) = advance_tree(&tree2, &ws2, &mut nodes, first_block);
tree2 = new_tree;
ws2 = new_witnesses;
}
@ -124,6 +130,11 @@ fn test_increasing_gap(run_normal: bool, run_warp: bool) {
eprintln!("{}, {}, {}", i, node_count, start.elapsed().as_millis());
}
if run_warp {
let (_, new_witnesses) = advance_tree(&tree2, &ws2, &mut [], false);
ws2 = new_witnesses;
}
println!("# witnesses = {}", ws2.len());
}

View File

@ -1,7 +1,7 @@
use sync::{DbAdapter, Wallet, DEFAULT_ACCOUNT};
use bip39::{Language, Mnemonic};
use rand::rngs::OsRng;
use rand::RngCore;
use sync::{DbAdapter, Wallet, DEFAULT_ACCOUNT, ChainError};
const DB_NAME: &str = "zec.db";
@ -23,8 +23,16 @@ async fn test() -> anyhow::Result<()> {
};
let wallet = Wallet::new(DB_NAME);
wallet.new_account_with_seed(&seed).unwrap();
wallet.sync(DEFAULT_ACCOUNT, progress).await.unwrap();
let tx_id = wallet.send_payment(DEFAULT_ACCOUNT, &address, 1000).await.unwrap();
let res = wallet.sync(DEFAULT_ACCOUNT, progress).await;
if let Err(err) = res {
if let Some(_) = err.downcast_ref::<ChainError>() {
println!("REORG");
}
}
let tx_id = wallet
.send_payment(DEFAULT_ACCOUNT, &address, 50000)
.await
.unwrap();
println!("TXID = {}", tx_id);
Ok(())

169
src/mempool.rs Normal file
View File

@ -0,0 +1,169 @@
use crate::chain::to_output_description;
use crate::{
connect_lightwalletd, get_latest_height, CompactTx, CompactTxStreamerClient, DbAdapter,
Exclude, NETWORK,
};
use std::collections::HashMap;
use tonic::transport::Channel;
use tonic::Request;
use zcash_client_backend::encoding::decode_extended_full_viewing_key;
use zcash_primitives::consensus::{BlockHeight, Parameters};
use zcash_primitives::sapling::note_encryption::try_sapling_compact_note_decryption;
use zcash_primitives::sapling::SaplingIvk;
const DEFAULT_EXCLUDE_LEN: u8 = 1;
struct MemPoolTransacton {
#[allow(dead_code)]
balance: i64, // negative if spent
exclude_len: u8,
}
pub struct MemPool {
db_path: String,
ivk: Option<SaplingIvk>,
height: BlockHeight,
transactions: HashMap<Vec<u8>, MemPoolTransacton>,
nfs: HashMap<Vec<u8>, u64>,
balance: i64,
}
impl MemPool {
pub fn new(db_path: &str) -> MemPool {
MemPool {
db_path: db_path.to_string(),
ivk: None,
height: BlockHeight::from(0),
transactions: HashMap::new(),
nfs: HashMap::new(),
balance: 0,
}
}
pub fn set_account(&mut self, account: u32) -> anyhow::Result<()> {
let db = DbAdapter::new(&self.db_path)?;
let ivk = db.get_ivk(account)?;
self.set_ivk(&ivk);
Ok(())
}
pub fn set_ivk(&mut self, ivk: &str) {
let fvk =
decode_extended_full_viewing_key(NETWORK.hrp_sapling_extended_full_viewing_key(), &ivk)
.unwrap()
.unwrap();
let ivk = fvk.fvk.vk.ivk();
self.ivk = Some(ivk);
}
pub async fn scan(&mut self) -> anyhow::Result<()> {
if self.ivk.is_some() {
let ivk = self.ivk.as_ref().unwrap().clone();
let mut client = connect_lightwalletd().await?;
let height = BlockHeight::from(get_latest_height(&mut client).await?);
if self.height != height {
// New blocks invalidate the mempool
let db = DbAdapter::new(&self.db_path)?;
self.clear(&db)?;
}
self.height = height;
self.update(&mut client, &ivk).await?;
}
Ok(())
}
pub fn get_unconfirmed_balance(&self) -> i64 {
self.balance
}
fn clear(&mut self, db: &DbAdapter) -> anyhow::Result<()> {
self.height = BlockHeight::from_u32(0);
self.nfs = db.get_nullifier_amounts()?;
self.transactions.clear();
self.balance = 0;
Ok(())
}
async fn update(
&mut self,
client: &mut CompactTxStreamerClient<Channel>,
ivk: &SaplingIvk,
) -> anyhow::Result<()> {
let filter: Vec<_> = self
.transactions
.iter()
.map(|(hash, tx)| {
let mut hash = hash.clone();
hash.truncate(tx.exclude_len as usize);
hash
})
.collect();
let exclude = Exclude { txid: filter };
let mut txs = client
.get_mempool_tx(Request::new(exclude))
.await?
.into_inner();
while let Some(tx) = txs.message().await? {
match self.transactions.get_mut(&*tx.hash) {
Some(tx) => {
tx.exclude_len += 1; // server sent us the same tx: make the filter more specific
}
None => {
let balance = self.scan_transaction(&tx, ivk);
let mempool_tx = MemPoolTransacton {
balance,
exclude_len: DEFAULT_EXCLUDE_LEN,
};
self.balance += balance;
self.transactions.insert(tx.hash.clone(), mempool_tx);
}
}
}
Ok(())
}
fn scan_transaction(&self, tx: &CompactTx, ivk: &SaplingIvk) -> i64 {
let mut balance = 0i64;
for cs in tx.spends.iter() {
if let Some(&value) = self.nfs.get(&*cs.nf) {
// nf recognized -> value is spent
balance -= value as i64;
}
}
for co in tx.outputs.iter() {
let od = to_output_description(co);
if let Some((note, _)) =
try_sapling_compact_note_decryption(&NETWORK, self.height, ivk, &od)
{
balance += note.value as i64; // value is incoming
}
}
balance
}
}
#[cfg(test)]
mod tests {
use crate::db::DEFAULT_DB_PATH;
use crate::mempool::MemPool;
use crate::DbAdapter;
use std::time::Duration;
#[tokio::test]
async fn test_mempool() {
let db = DbAdapter::new(DEFAULT_DB_PATH).unwrap();
let ivk = db.get_ivk(1).unwrap();
let mut mempool = MemPool::new("zec.db");
mempool.set_ivk(&ivk);
loop {
mempool.scan().await.unwrap();
let unconfirmed = mempool.get_unconfirmed_balance();
println!("{}", unconfirmed);
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
}

View File

@ -1,6 +1,6 @@
use zcash_primitives::sapling::Node;
use crate::{CTree, Witness};
use zcash_primitives::merkle_tree::{CommitmentTree, IncrementalWitness};
use crate::{Witness, CTree};
use zcash_primitives::sapling::Node;
#[allow(dead_code)]
pub fn print_node(n: &Node) {
@ -52,5 +52,4 @@ pub fn print_witness2(w: &Witness) {
for p in t.parents.iter() {
println!("{:?}", p.map(|n| hex::encode(n.repr)));
}
}

View File

@ -1,18 +1,22 @@
use zcash_primitives::sapling::Node;
use crate::lw_rpc::compact_tx_streamer_client::CompactTxStreamerClient;
use crate::{DecryptNode, LWD_URL, get_latest_height, download_chain, calculate_tree_state_v2, CompactBlock, NETWORK, connect_lightwalletd, Witness, advance_tree};
use zcash_primitives::consensus::{NetworkUpgrade, Parameters};
use zcash_client_backend::encoding::decode_extended_full_viewing_key;
use tokio::sync::mpsc;
use std::time::Instant;
use std::ops::Range;
use log::info;
use crate::db::{DbAdapter, ReceivedNote};
use ff::PrimeField;
use zcash_primitives::zip32::ExtendedFullViewingKey;
use crate::builder::BlockProcessor;
use crate::chain::Nf;
use crate::db::{DbAdapter, ReceivedNote};
use crate::lw_rpc::compact_tx_streamer_client::CompactTxStreamerClient;
use crate::{
calculate_tree_state_v2, connect_lightwalletd, download_chain, get_latest_height, CompactBlock,
DecryptNode, Witness, LWD_URL, NETWORK,
};
use ff::PrimeField;
use log::{debug, info};
use std::ops::Range;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::mpsc;
use tokio::sync::Mutex;
use zcash_client_backend::encoding::decode_extended_full_viewing_key;
use zcash_primitives::consensus::{NetworkUpgrade, Parameters};
use zcash_primitives::sapling::Node;
use zcash_primitives::zip32::ExtendedFullViewingKey;
pub async fn scan_all(fvks: &[ExtendedFullViewingKey]) -> anyhow::Result<()> {
let decrypter = DecryptNode::new(fvks.to_vec());
@ -26,7 +30,7 @@ pub async fn scan_all(fvks: &[ExtendedFullViewingKey]) -> anyhow::Result<()> {
let end_height = get_latest_height(&mut client).await?;
let start = Instant::now();
let cbs = download_chain(&mut client, start_height, end_height).await?;
let cbs = download_chain(&mut client, start_height, end_height, None).await?;
info!("Download chain: {} ms", start.elapsed().as_millis());
let start = Instant::now();
@ -35,7 +39,7 @@ pub async fn scan_all(fvks: &[ExtendedFullViewingKey]) -> anyhow::Result<()> {
let witnesses = calculate_tree_state_v2(&cbs, &blocks);
info!("# Witnesses {}", witnesses.len());
debug!("# Witnesses {}", witnesses.len());
for w in witnesses.iter() {
let mut bb: Vec<u8> = vec![];
w.write(&mut bb)?;
@ -51,6 +55,7 @@ struct Blocks(Vec<CompactBlock>);
struct BlockMetadata {
height: u32,
hash: [u8; 32],
timestamp: u32,
}
impl std::fmt::Debug for Blocks {
@ -61,31 +66,42 @@ impl std::fmt::Debug for Blocks {
pub type ProgressCallback = Arc<Mutex<dyn Fn(u32) + Send>>;
pub async fn sync_async(ivk: &str, chunk_size: u32, db_path: &str, target_height_offset: u32, progress_callback: ProgressCallback) -> anyhow::Result<()> {
pub async fn sync_async(
ivk: &str,
chunk_size: u32,
db_path: &str,
target_height_offset: u32,
progress_callback: ProgressCallback,
) -> anyhow::Result<()> {
let db_path = db_path.to_string();
let fvk =
decode_extended_full_viewing_key(NETWORK.hrp_sapling_extended_full_viewing_key(), &ivk)
?.ok_or_else(|| anyhow::anyhow!("Invalid key"))?;
decode_extended_full_viewing_key(NETWORK.hrp_sapling_extended_full_viewing_key(), &ivk)?
.ok_or_else(|| anyhow::anyhow!("Invalid key"))?;
let decrypter = DecryptNode::new(vec![fvk]);
let mut client = connect_lightwalletd().await?;
let start_height = {
let (start_height, mut prev_hash) = {
let db = DbAdapter::new(&db_path)?;
db.get_db_height()?
let height = db.get_db_height()?;
(height, db.get_db_hash(height)?)
};
let end_height = get_latest_height(&mut client).await?;
let end_height = (end_height - target_height_offset).max(start_height);
let (downloader_tx, mut download_rx) = mpsc::channel::<Range<u32>>(2);
let (processor_tx, mut processor_rx) = mpsc::channel::<Blocks>(1);
let (completed_tx, mut completed_rx) = mpsc::channel::<()>(1);
let downloader = tokio::spawn(async move {
let mut client = connect_lightwalletd().await?;
while let Some(range) = download_rx.recv().await {
log::info!("+ {:?}", range);
let blocks = download_chain(&mut client, range.start, range.end).await?;
log::info!("- {:?}", range);
let blocks = download_chain(&mut client, range.start, range.end, prev_hash).await?;
log::debug!("- {:?}", range);
blocks.last().map(|cb| {
let mut ph = [0u8; 32];
ph.copy_from_slice(&cb.hash);
prev_hash = Some(ph);
});
let b = Blocks(blocks);
processor_tx.send(b).await?;
}
@ -102,48 +118,84 @@ pub async fn sync_async(ivk: &str, chunk_size: u32, db_path: &str, target_height
let mut nfs = db.get_nullifiers()?;
let (mut tree, mut witnesses) = db.get_tree()?;
let mut bp = BlockProcessor::new(&tree, &witnesses);
let mut absolute_position_at_block_start = tree.get_position();
let mut last_block: Option<BlockMetadata> = None;
while let Some(blocks) = processor_rx.recv().await {
log::info!("{:?}", blocks);
if blocks.0.is_empty() { continue }
if blocks.0.is_empty() {
continue;
}
let dec_blocks = decrypter.decrypt_blocks(&blocks.0);
let mut witnesses: Vec<Witness> = vec![];
for b in dec_blocks.iter() {
let mut my_nfs: Vec<Nf> = vec![];
for nf in b.spends.iter() {
if let Some(&id) = nfs.get(nf) {
println!("NF FOUND {} {}", id, b.height);
log::info!("NF FOUND {} {}", id, b.height);
db.mark_spent(id, b.height)?;
my_nfs.push(*nf);
}
}
if !b.notes.is_empty() {
log::info!("{} {}", b.height, b.notes.len());
for nf in b.spends.iter() {
println!("{}", hex::encode(nf.0));
}
log::debug!("{} {}", b.height, b.notes.len());
}
for n in b.notes.iter() {
let p = absolute_position_at_block_start + n.position_in_block;
let note = &n.note;
let id_tx = db.store_transaction(&n.txid, n.height, n.tx_index as u32)?;
let rcm = note.rcm().to_repr();
let nf = note.nf(&n.ivk.fvk.vk, p as u64);
let id_note = db.store_received_note(&ReceivedNote {
let id_tx = db.store_transaction(
&n.txid,
n.height,
b.compact_block.time,
n.tx_index as u32,
)?;
let id_note = db.store_received_note(
&ReceivedNote {
height: n.height,
output_index: n.output_index as u32,
diversifier: n.pa.diversifier().0.to_vec(),
value: note.value,
rcm: rcm.to_vec(),
nf: nf.0.to_vec(),
spent: None
}, id_tx, n.position_in_block)?;
spent: None,
},
id_tx,
n.position_in_block,
)?;
db.add_value(id_tx, note.value as i64)?;
nfs.insert(Nf(nf.0), id_note);
let w = Witness::new(p as usize, id_note, Some(n.clone()));
witnesses.push(w);
}
if !my_nfs.is_empty() {
for (tx_index, tx) in b.compact_block.vtx.iter().enumerate() {
for cs in tx.spends.iter() {
let mut nf = [0u8; 32];
nf.copy_from_slice(&cs.nf);
let nf = Nf(nf);
if my_nfs.contains(&nf) {
let note_value = db.get_received_note_value(&nf)?;
let txid = &*tx.hash;
let id_tx = db.store_transaction(
txid,
b.height,
b.compact_block.time,
tx_index as u32,
)?;
db.add_value(id_tx, -(note_value as i64))?;
}
}
}
}
absolute_position_at_block_start += b.count_outputs as usize;
}
@ -159,9 +211,9 @@ pub async fn sync_async(ivk: &str, chunk_size: u32, db_path: &str, target_height
}
}
let (new_tree, new_witnesses) = advance_tree(tree, &witnesses, &mut nodes);
tree = new_tree;
witnesses = new_witnesses;
if !nodes.is_empty() {
bp.add_nodes(&mut nodes, &witnesses);
}
if let Some(block) = blocks.0.last() {
let mut hash = [0u8; 32];
@ -169,29 +221,30 @@ pub async fn sync_async(ivk: &str, chunk_size: u32, db_path: &str, target_height
last_block = Some(BlockMetadata {
height: block.height as u32,
hash,
timestamp: block.time,
});
}
log::info!("progress: {}", blocks.0[0].height);
let callback = proc_callback.lock().await;
callback(blocks.0[0].height as u32);
}
// Finalize scan
let (new_tree, new_witnesses) = advance_tree(tree, &witnesses, &mut []);
let (new_tree, new_witnesses) = bp.finalize();
tree = new_tree;
witnesses = new_witnesses;
if let Some(last_block) = last_block {
let last_height = last_block.height;
db.store_block(last_height, &last_block.hash, &tree)?;
db.store_block(last_height, &last_block.hash, last_block.timestamp, &tree)?;
for w in witnesses.iter() {
db.store_witnesses(w, last_height, w.id_note)?;
}
}
// let callback = progress_callback.lock()?;
// callback(end_height);
log::info!("Witnesses {}", witnesses.len());
drop(completed_tx);
let callback = progress_callback.lock().await;
callback(end_height);
log::debug!("Witnesses {}", witnesses.len());
Ok::<_, anyhow::Error>(())
});
@ -202,24 +255,32 @@ pub async fn sync_async(ivk: &str, chunk_size: u32, db_path: &str, target_height
let e = (height + chunk_size).min(end_height);
let range = s..e;
downloader_tx.send(range).await?;
let _ = downloader_tx.send(range).await;
height = e;
}
drop(downloader_tx);
log::info!("req downloading completed");
completed_rx.recv().await;
log::info!("completed");
let res = tokio::try_join!(downloader, processor);
match res {
Ok((a, b)) => {
if let Err(err) = a { log::info!("Downloader error = {}", err) }
if let Err(err) = b { log::info!("Processor error = {}", err) }
},
Err(err) => log::info!("Sync error = {}", err),
Ok((d, p)) => {
if let Err(err) = d {
log::info!("Downloader error = {}", err);
return Err(err);
}
if let Err(err) = p {
log::info!("Processor error = {}", err);
return Err(err);
}
}
Err(err) => {
log::info!("Sync error = {}", err);
anyhow::bail!("Join Error");
},
}
log::info!("Sync completed");
Ok(())
}

View File

@ -1,30 +1,52 @@
use crate::{NETWORK, get_latest_height, connect_lightwalletd, DbAdapter, get_secret_key, get_viewing_key, get_address};
use zcash_client_backend::address::RecipientAddress;
use zcash_primitives::transaction::components::Amount;
use zcash_primitives::transaction::builder::Builder;
use zcash_client_backend::encoding::decode_extended_spending_key;
use zcash_primitives::consensus::{Parameters, BlockHeight, BranchId};
use zcash_primitives::zip32::ExtendedFullViewingKey;
use zcash_client_backend::data_api::wallet::ANCHOR_OFFSET;
use crate::chain::send_transaction;
use crate::mempool::MemPool;
use crate::scan::ProgressCallback;
use crate::{connect_lightwalletd, get_address, get_latest_height, get_secret_key, get_viewing_key, DbAdapter, NETWORK, BlockId, CTree};
use anyhow::Context;
use bip39::{Language, Mnemonic};
use rand::prelude::SliceRandom;
use rand::rngs::OsRng;
use zcash_proofs::prover::LocalTxProver;
use crate::chain::send_transaction;
use zcash_params::{SPEND_PARAMS, OUTPUT_PARAMS};
use rand::RngCore;
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::scan::ProgressCallback;
use zcash_client_backend::address::RecipientAddress;
use zcash_client_backend::data_api::wallet::ANCHOR_OFFSET;
use zcash_client_backend::encoding::{decode_extended_spending_key, decode_payment_address};
use zcash_params::{OUTPUT_PARAMS, SPEND_PARAMS};
use zcash_primitives::consensus::{BlockHeight, BranchId, Parameters};
use zcash_primitives::transaction::builder::Builder;
use zcash_primitives::transaction::components::amount::DEFAULT_FEE;
use zcash_primitives::transaction::components::Amount;
use zcash_primitives::zip32::ExtendedFullViewingKey;
use zcash_proofs::prover::LocalTxProver;
use tonic::Request;
pub const DEFAULT_ACCOUNT: u32 = 1;
const DEFAULT_CHUNK_SIZE: u32 = 100_000;
pub struct Wallet {
db_path: String,
pub db_path: String,
db: DbAdapter,
prover: LocalTxProver,
}
#[repr(C)]
pub struct WalletBalance {
pub confirmed: u64,
pub unconfirmed: i64,
pub spendable: u64,
}
impl Default for WalletBalance {
fn default() -> Self {
WalletBalance {
confirmed: 0,
unconfirmed: 0,
spendable: 0,
}
}
}
impl Wallet {
pub fn new(db_path: &str) -> Wallet {
let prover = LocalTxProver::from_bytes(SPEND_PARAMS, OUTPUT_PARAMS);
@ -37,6 +59,27 @@ impl Wallet {
}
}
pub fn valid_seed(seed: &str) -> bool {
get_secret_key(&seed).is_ok()
}
pub fn valid_address(address: &str) -> bool {
decode_payment_address(NETWORK.hrp_sapling_payment_address(), address).is_ok()
}
pub fn new_seed(&self) -> anyhow::Result<()> {
let mut entropy = [0u8; 32];
OsRng.fill_bytes(&mut entropy);
let mnemonic = Mnemonic::from_entropy(&entropy, Language::English)?;
let seed = mnemonic.phrase();
self.new_account_with_seed(seed)?;
Ok(())
}
pub fn has_account(&self, account: u32) -> anyhow::Result<bool> {
self.db.has_account(account)
}
pub fn new_account_with_seed(&self, seed: &str) -> anyhow::Result<()> {
let sk = get_secret_key(&seed).unwrap();
let vk = get_viewing_key(&sk).unwrap();
@ -45,8 +88,21 @@ impl Wallet {
Ok(())
}
async fn scan_async(&self, ivk: &str, chunk_size: u32, target_height_offset: u32, progress_callback: ProgressCallback) -> anyhow::Result<()> {
crate::scan::sync_async(ivk, chunk_size, &self.db_path, target_height_offset, progress_callback).await
async fn scan_async(
ivk: &str,
db_path: &str,
chunk_size: u32,
target_height_offset: u32,
progress_callback: ProgressCallback,
) -> anyhow::Result<()> {
crate::scan::sync_async(
ivk,
chunk_size,
db_path,
target_height_offset,
progress_callback,
)
.await
}
pub async fn get_latest_height() -> anyhow::Result<u32> {
@ -55,30 +111,82 @@ impl Wallet {
Ok(last_height)
}
pub async fn sync(&self, account: u32, progress_callback: impl Fn(u32) + Send + 'static) -> anyhow::Result<()> {
let ivk = self.db.get_ivk(account)?;
// Not a method in order to avoid locking the instance
pub async fn sync_ex(
db_path: &str,
ivk: &str,
progress_callback: impl Fn(u32) + Send + 'static,
) -> anyhow::Result<()> {
let cb = Arc::new(Mutex::new(progress_callback));
self.scan_async(&ivk, DEFAULT_CHUNK_SIZE, 10, cb.clone()).await?;
self.scan_async(&ivk, DEFAULT_CHUNK_SIZE, 0, cb.clone()).await?;
Self::scan_async(&ivk, db_path, DEFAULT_CHUNK_SIZE, 10, cb.clone()).await?;
Self::scan_async(&ivk, db_path, DEFAULT_CHUNK_SIZE, 0, cb.clone()).await?;
Ok(())
}
pub fn get_balance(&self) -> anyhow::Result<u64> {
self.db.get_balance()
pub async fn sync(
&self,
account: u32,
progress_callback: impl Fn(u32) + Send + 'static,
) -> anyhow::Result<()> {
let ivk = self.get_ivk(account)?;
Self::sync_ex(&self.db_path, &ivk, progress_callback).await
}
pub async fn send_payment(&self, account: u32, to_address: &str, amount: u64) -> anyhow::Result<String> {
pub async fn skip_to_last_height(&self) -> anyhow::Result<()> {
let mut client = connect_lightwalletd().await?;
let last_height = get_latest_height(&mut client).await?;
let block_id = BlockId {
height: last_height as u64,
hash: vec![],
};
let block = client.get_block(block_id.clone()).await?.into_inner();
let tree_state = client.get_tree_state(Request::new(block_id)).await?.into_inner();
let tree = CTree::read(&*hex::decode(&tree_state.tree)?)?;
self.db.store_block(last_height, &block.hash, block.time, &tree)?;
Ok(())
}
pub fn rewind_to_height(&mut self, height: u32) -> anyhow::Result<()> {
self.db.trim_to_height(height)
}
pub async fn get_balance(&self, mempool: &MemPool) -> anyhow::Result<WalletBalance> {
let last_height = Self::get_latest_height().await?;
let anchor_height = last_height - ANCHOR_OFFSET;
let confirmed = self.db.get_balance()?;
let unconfirmed = mempool.get_unconfirmed_balance();
let spendable = self.db.get_spendable_balance(anchor_height)?;
Ok(WalletBalance {
confirmed,
unconfirmed,
spendable,
})
}
pub async fn send_payment(
&self,
account: u32,
to_address: &str,
amount: u64,
) -> anyhow::Result<String> {
let secret_key = self.db.get_sk(account)?;
let to_addr = RecipientAddress::decode(&NETWORK, to_address)
.ok_or(anyhow::anyhow!("Invalid address"))?;
let target_amount = Amount::from_u64(amount).unwrap();
let skey = decode_extended_spending_key(NETWORK.hrp_sapling_extended_spending_key(), &secret_key)?.unwrap();
let skey =
decode_extended_spending_key(NETWORK.hrp_sapling_extended_spending_key(), &secret_key)?
.unwrap();
let extfvk = ExtendedFullViewingKey::from(&skey);
let (_, change_address) = extfvk.default_address().unwrap();
let ovk = extfvk.fvk.ovk;
let last_height = Self::get_latest_height().await?;
let mut builder = Builder::new(NETWORK, BlockHeight::from_u32(last_height));
let anchor_height = self.db.get_last_sync_height()?.ok_or_else(|| anyhow::anyhow!("No spendable notes"))?;
let anchor_height = self
.db
.get_last_sync_height()?
.ok_or_else(|| anyhow::anyhow!("No spendable notes"))?;
let anchor_height = anchor_height.min(last_height - ANCHOR_OFFSET);
log::info!("Anchor = {}", anchor_height);
let mut notes = self.db.get_spendable_notes(anchor_height, &extfvk)?;
@ -89,37 +197,58 @@ impl Wallet {
amount += DEFAULT_FEE;
for n in notes {
if amount.is_positive() {
let a = amount.min(Amount::from_u64(n.note.value).unwrap());
let a = amount.min(
Amount::from_u64(n.note.value)
.map_err(|_| anyhow::anyhow!("Invalid amount"))?,
);
amount -= a;
let merkle_path = n.witness.path().unwrap();
builder.add_sapling_spend(skey.clone(), n.diversifier, n.note.clone(), merkle_path)?;
let merkle_path = n.witness.path().context("Invalid Merkle Path")?;
builder.add_sapling_spend(
skey.clone(),
n.diversifier,
n.note.clone(),
merkle_path,
)?;
}
}
if amount.is_positive() {
anyhow::bail!("Not enough balance")
log::info!("Not enough balance");
return Ok("".to_string());
}
log::info!("Preparing tx");
builder.send_change_to(Some(ovk), change_address);
match to_addr {
RecipientAddress::Shielded(pa) => builder.add_sapling_output(Some(ovk), pa, target_amount, None),
RecipientAddress::Transparent(t_address) => builder.add_transparent_output(&t_address, target_amount),
RecipientAddress::Shielded(pa) => {
builder.add_sapling_output(Some(ovk), pa, target_amount, None)
}
RecipientAddress::Transparent(t_address) => {
builder.add_transparent_output(&t_address, target_amount)
}
}?;
let consensus_branch_id = BranchId::for_height(&NETWORK, BlockHeight::from_u32(last_height));
let consensus_branch_id =
BranchId::for_height(&NETWORK, BlockHeight::from_u32(last_height));
let (tx, _) = builder.build(consensus_branch_id, &self.prover)?;
log::info!("Tx built");
let mut raw_tx: Vec<u8> = vec![];
tx.write(&mut raw_tx)?;
let mut client = connect_lightwalletd().await?;
let tx_id = send_transaction(&mut client, &raw_tx, last_height).await?;
log::info!("Tx ID = {}", tx_id);
Ok(tx_id)
}
pub fn get_ivk(&self, account: u32) -> anyhow::Result<String> {
self.db.get_ivk(account)
}
}
#[cfg(test)]
mod tests {
use crate::{get_secret_key, get_viewing_key, get_address};
use crate::wallet::Wallet;
use crate::{get_address, get_secret_key, get_viewing_key};
#[tokio::test]
async fn test_wallet_seed() {

BIN
zec.db-journal Normal file

Binary file not shown.