zcash-sync/src/scan.rs

310 lines
11 KiB
Rust
Raw Normal View History

2021-06-26 02:52:03 -07:00
use crate::builder::BlockProcessor;
2021-06-29 00:04:12 -07:00
use crate::chain::{Nf, NfRef};
2021-06-21 17:33:13 -07:00
use crate::db::{DbAdapter, ReceivedNote};
2021-06-26 02:52:03 -07:00
use crate::lw_rpc::compact_tx_streamer_client::CompactTxStreamerClient;
use crate::{
calculate_tree_state_v2, connect_lightwalletd, download_chain, get_latest_height, CompactBlock,
DecryptNode, Witness, LWD_URL, NETWORK,
};
2021-06-21 17:33:13 -07:00
use ff::PrimeField;
2021-06-26 02:52:03 -07:00
use log::{debug, info};
use std::ops::Range;
2021-06-24 05:08:20 -07:00
use std::sync::Arc;
2021-06-26 02:52:03 -07:00
use std::time::Instant;
use tokio::sync::mpsc;
2021-06-24 05:08:20 -07:00
use tokio::sync::Mutex;
2021-06-26 02:52:03 -07:00
use zcash_client_backend::encoding::decode_extended_full_viewing_key;
use zcash_primitives::consensus::{NetworkUpgrade, Parameters};
use zcash_primitives::sapling::Node;
use zcash_primitives::zip32::ExtendedFullViewingKey;
2021-06-28 21:49:00 -07:00
use std::panic;
2021-06-29 00:04:12 -07:00
use std::collections::HashMap;
2021-06-18 01:17:41 -07:00
2021-06-21 17:33:13 -07:00
pub async fn scan_all(fvks: &[ExtendedFullViewingKey]) -> anyhow::Result<()> {
2021-06-29 00:04:12 -07:00
let fvks: HashMap<_, _> = fvks.iter().enumerate().map(|(i, fvk)|
(i as u32, fvk.clone())).collect();
let decrypter = DecryptNode::new(fvks);
2021-06-18 01:17:41 -07:00
let total_start = Instant::now();
let mut client = CompactTxStreamerClient::connect(LWD_URL).await?;
let start_height: u32 = crate::NETWORK
.activation_height(NetworkUpgrade::Sapling)
.unwrap()
.into();
let end_height = get_latest_height(&mut client).await?;
let start = Instant::now();
2021-06-26 02:52:03 -07:00
let cbs = download_chain(&mut client, start_height, end_height, None).await?;
2021-06-18 01:17:41 -07:00
info!("Download chain: {} ms", start.elapsed().as_millis());
let start = Instant::now();
let blocks = decrypter.decrypt_blocks(&cbs);
info!("Decrypt Notes: {} ms", start.elapsed().as_millis());
let witnesses = calculate_tree_state_v2(&cbs, &blocks);
2021-06-26 02:52:03 -07:00
debug!("# Witnesses {}", witnesses.len());
2021-06-18 01:17:41 -07:00
for w in witnesses.iter() {
let mut bb: Vec<u8> = vec![];
2021-06-24 05:08:20 -07:00
w.write(&mut bb)?;
2021-06-18 01:17:41 -07:00
log::debug!("{}", hex::encode(&bb));
}
info!("Total: {} ms", total_start.elapsed().as_millis());
Ok(())
}
2021-06-21 17:33:13 -07:00
struct Blocks(Vec<CompactBlock>);
2021-06-24 05:08:20 -07:00
struct BlockMetadata {
height: u32,
hash: [u8; 32],
2021-06-26 02:52:03 -07:00
timestamp: u32,
2021-06-24 05:08:20 -07:00
}
2021-06-21 17:33:13 -07:00
impl std::fmt::Debug for Blocks {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Blocks of len {}", self.0.len())
}
}
2021-06-24 05:08:20 -07:00
pub type ProgressCallback = Arc<Mutex<dyn Fn(u32) + Send>>;
2021-06-21 17:33:13 -07:00
2021-06-26 02:52:03 -07:00
pub async fn sync_async(
chunk_size: u32,
db_path: &str,
target_height_offset: u32,
progress_callback: ProgressCallback,
) -> anyhow::Result<()> {
2021-06-21 17:33:13 -07:00
let db_path = db_path.to_string();
let mut client = connect_lightwalletd().await?;
2021-06-29 00:04:12 -07:00
let (start_height, mut prev_hash, fvks) = {
2021-06-24 05:08:20 -07:00
let db = DbAdapter::new(&db_path)?;
2021-06-26 02:52:03 -07:00
let height = db.get_db_height()?;
2021-06-29 00:04:12 -07:00
let hash = db.get_db_hash(height)?;
let fvks = db.get_fvks()?;
(height, hash, fvks)
2021-06-24 05:08:20 -07:00
};
2021-06-21 17:33:13 -07:00
let end_height = get_latest_height(&mut client).await?;
2021-06-24 05:08:20 -07:00
let end_height = (end_height - target_height_offset).max(start_height);
2021-06-21 17:33:13 -07:00
2021-06-29 00:04:12 -07:00
let fvks: HashMap<_, _> = fvks.iter().map(|(&account, fvk)| {
let fvk =
decode_extended_full_viewing_key(NETWORK.hrp_sapling_extended_full_viewing_key(), &fvk)
.unwrap()
.unwrap();
(account, fvk)
}).collect();
let decrypter = DecryptNode::new(fvks);
2021-06-21 17:33:13 -07:00
let (downloader_tx, mut download_rx) = mpsc::channel::<Range<u32>>(2);
2021-06-24 05:08:20 -07:00
let (processor_tx, mut processor_rx) = mpsc::channel::<Blocks>(1);
2021-06-21 17:33:13 -07:00
2021-06-24 05:08:20 -07:00
let downloader = tokio::spawn(async move {
let mut client = connect_lightwalletd().await?;
2021-06-21 17:33:13 -07:00
while let Some(range) = download_rx.recv().await {
2021-06-24 05:08:20 -07:00
log::info!("+ {:?}", range);
2021-06-26 02:52:03 -07:00
let blocks = download_chain(&mut client, range.start, range.end, prev_hash).await?;
log::debug!("- {:?}", range);
blocks.last().map(|cb| {
let mut ph = [0u8; 32];
ph.copy_from_slice(&cb.hash);
prev_hash = Some(ph);
});
2021-06-21 17:33:13 -07:00
let b = Blocks(blocks);
2021-06-24 05:08:20 -07:00
processor_tx.send(b).await?;
2021-06-21 17:33:13 -07:00
}
log::info!("download completed");
drop(processor_tx);
2021-06-24 05:08:20 -07:00
Ok::<_, anyhow::Error>(())
2021-06-21 17:33:13 -07:00
});
2021-06-24 05:08:20 -07:00
let proc_callback = progress_callback.clone();
let processor = tokio::spawn(async move {
let db = DbAdapter::new(&db_path)?;
let mut nfs = db.get_nullifiers()?;
let (mut tree, mut witnesses) = db.get_tree()?;
2021-06-26 02:52:03 -07:00
let mut bp = BlockProcessor::new(&tree, &witnesses);
2021-06-24 05:08:20 -07:00
let mut absolute_position_at_block_start = tree.get_position();
let mut last_block: Option<BlockMetadata> = None;
2021-06-21 17:33:13 -07:00
while let Some(blocks) = processor_rx.recv().await {
log::info!("{:?}", blocks);
2021-06-26 02:52:03 -07:00
if blocks.0.is_empty() {
continue;
}
2021-06-21 17:33:13 -07:00
let dec_blocks = decrypter.decrypt_blocks(&blocks.0);
2021-06-26 02:52:03 -07:00
let mut witnesses: Vec<Witness> = vec![];
2021-06-21 17:33:13 -07:00
for b in dec_blocks.iter() {
2021-06-26 02:52:03 -07:00
let mut my_nfs: Vec<Nf> = vec![];
2021-06-24 05:08:20 -07:00
for nf in b.spends.iter() {
2021-06-29 00:04:12 -07:00
if let Some(&nf_ref) = nfs.get(nf) {
log::info!("NF FOUND {} {}", nf_ref.id_note, b.height);
db.mark_spent(nf_ref.id_note, b.height)?;
2021-06-26 02:52:03 -07:00
my_nfs.push(*nf);
2021-06-24 05:08:20 -07:00
}
}
2021-06-21 17:33:13 -07:00
if !b.notes.is_empty() {
2021-06-26 02:52:03 -07:00
log::debug!("{} {}", b.height, b.notes.len());
2021-06-21 17:33:13 -07:00
}
2021-06-26 02:52:03 -07:00
2021-06-21 17:33:13 -07:00
for n in b.notes.iter() {
2021-06-24 05:08:20 -07:00
let p = absolute_position_at_block_start + n.position_in_block;
2021-06-21 17:33:13 -07:00
let note = &n.note;
let rcm = note.rcm().to_repr();
2021-06-24 05:08:20 -07:00
let nf = note.nf(&n.ivk.fvk.vk, p as u64);
2021-06-21 17:33:13 -07:00
2021-06-26 02:52:03 -07:00
let id_tx = db.store_transaction(
&n.txid,
2021-06-29 00:04:12 -07:00
n.account,
2021-06-26 02:52:03 -07:00
n.height,
b.compact_block.time,
n.tx_index as u32,
)?;
let id_note = db.store_received_note(
&ReceivedNote {
2021-06-29 00:04:12 -07:00
account: n.account,
2021-06-26 02:52:03 -07:00
height: n.height,
output_index: n.output_index as u32,
diversifier: n.pa.diversifier().0.to_vec(),
value: note.value,
rcm: rcm.to_vec(),
nf: nf.0.to_vec(),
spent: None,
},
id_tx,
n.position_in_block,
)?;
db.add_value(id_tx, note.value as i64)?;
2021-06-29 00:04:12 -07:00
nfs.insert(Nf(nf.0), NfRef { id_note, account: n.account });
2021-06-21 17:33:13 -07:00
let w = Witness::new(p as usize, id_note, Some(n.clone()));
witnesses.push(w);
}
2021-06-26 02:52:03 -07:00
if !my_nfs.is_empty() {
for (tx_index, tx) in b.compact_block.vtx.iter().enumerate() {
for cs in tx.spends.iter() {
let mut nf = [0u8; 32];
nf.copy_from_slice(&cs.nf);
let nf = Nf(nf);
if my_nfs.contains(&nf) {
2021-06-29 00:04:12 -07:00
let (account, note_value) = db.get_received_note_value(&nf)?;
2021-06-26 02:52:03 -07:00
let txid = &*tx.hash;
let id_tx = db.store_transaction(
txid,
2021-06-29 00:04:12 -07:00
account,
2021-06-26 02:52:03 -07:00
b.height,
b.compact_block.time,
tx_index as u32,
)?;
db.add_value(id_tx, -(note_value as i64))?;
}
}
}
}
2021-06-24 05:08:20 -07:00
absolute_position_at_block_start += b.count_outputs as usize;
2021-06-21 17:33:13 -07:00
}
let mut nodes: Vec<Node> = vec![];
for cb in blocks.0.iter() {
for tx in cb.vtx.iter() {
for co in tx.outputs.iter() {
let mut cmu = [0u8; 32];
cmu.copy_from_slice(&co.cmu);
let node = Node::new(cmu);
nodes.push(node);
}
}
}
2021-06-26 02:52:03 -07:00
if !nodes.is_empty() {
bp.add_nodes(&mut nodes, &witnesses);
}
2021-06-28 21:49:00 -07:00
// println!("NOTES = {}", nodes.len());
2021-06-21 17:33:13 -07:00
2021-06-24 05:08:20 -07:00
if let Some(block) = blocks.0.last() {
let mut hash = [0u8; 32];
hash.copy_from_slice(&block.hash);
last_block = Some(BlockMetadata {
height: block.height as u32,
hash,
2021-06-26 02:52:03 -07:00
timestamp: block.time,
2021-06-24 05:08:20 -07:00
});
2021-06-21 17:33:13 -07:00
}
2021-06-26 02:52:03 -07:00
log::info!("progress: {}", blocks.0[0].height);
2021-06-24 05:08:20 -07:00
let callback = proc_callback.lock().await;
callback(blocks.0[0].height as u32);
}
// Finalize scan
2021-06-26 02:52:03 -07:00
let (new_tree, new_witnesses) = bp.finalize();
2021-06-24 05:08:20 -07:00
tree = new_tree;
witnesses = new_witnesses;
2021-06-21 17:33:13 -07:00
2021-06-24 05:08:20 -07:00
if let Some(last_block) = last_block {
let last_height = last_block.height;
2021-06-26 02:52:03 -07:00
db.store_block(last_height, &last_block.hash, last_block.timestamp, &tree)?;
2021-06-24 05:08:20 -07:00
for w in witnesses.iter() {
db.store_witnesses(w, last_height, w.id_note)?;
}
2021-06-21 17:33:13 -07:00
}
2021-06-26 02:52:03 -07:00
let callback = progress_callback.lock().await;
callback(end_height);
log::debug!("Witnesses {}", witnesses.len());
2021-06-24 05:08:20 -07:00
Ok::<_, anyhow::Error>(())
2021-06-21 17:33:13 -07:00
});
let mut height = start_height;
while height < end_height {
let s = height;
let e = (height + chunk_size).min(end_height);
let range = s..e;
2021-06-26 02:52:03 -07:00
let _ = downloader_tx.send(range).await;
2021-06-21 17:33:13 -07:00
height = e;
}
drop(downloader_tx);
log::info!("req downloading completed");
2021-06-24 05:08:20 -07:00
let res = tokio::try_join!(downloader, processor);
match res {
2021-06-26 02:52:03 -07:00
Ok((d, p)) => {
if let Err(err) = d {
log::info!("Downloader error = {}", err);
return Err(err);
}
if let Err(err) = p {
log::info!("Processor error = {}", err);
return Err(err);
}
}
Err(err) => {
log::info!("Sync error = {}", err);
2021-06-28 21:49:00 -07:00
if err.is_panic() {
panic::resume_unwind(err.into_panic());
}
2021-06-26 02:52:03 -07:00
anyhow::bail!("Join Error");
2021-06-24 05:08:20 -07:00
},
}
2021-06-26 02:52:03 -07:00
log::info!("Sync completed");
2021-06-21 17:33:13 -07:00
Ok(())
}
2021-06-24 05:08:20 -07:00
pub async fn latest_height() -> anyhow::Result<u32> {
let mut client = connect_lightwalletd().await?;
let height = get_latest_height(&mut client).await?;
Ok(height)
2021-06-21 17:33:13 -07:00
}