From 18a52488fe915ab06475fcfe78f59553361bccca Mon Sep 17 00:00:00 2001 From: Hanh Date: Sat, 31 Jul 2021 15:34:57 +0800 Subject: [PATCH] Concurrent get transaction --- Cargo.toml | 1 + src/transaction.rs | 69 +++++++++++++++++++++++++++++++++++++++------- 2 files changed, 60 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a00a5d6..cbff7b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ flexi_logger = {version="0.17.1", features = ["compress"]} serde = {version = "1.0.126", features = ["derive"]} serde_json = "1.0.64" tokio = { version = "^1.6", features = ["macros", "rt-multi-thread"] } +tokio-stream = "0.1.7" protobuf = "2.23.0" hex = "0.4.3" futures = "0.3.15" diff --git a/src/transaction.rs b/src/transaction.rs index a283f51..9b42d51 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -13,9 +13,14 @@ use zcash_primitives::sapling::note_encryption::{ }; use zcash_primitives::transaction::Transaction; use zcash_primitives::zip32::ExtendedFullViewingKey; +use futures::StreamExt; +use std::sync::mpsc; +use std::sync::mpsc::SyncSender; #[derive(Debug)] pub struct TransactionInfo { + id_tx: u32, + account: u32, pub address: String, pub memo: String, amount: i64, @@ -31,6 +36,7 @@ pub struct Contact { pub async fn decode_transaction( client: &mut CompactTxStreamerClient, nfs: &HashMap<(u32, Vec), u64>, + id_tx: u32, account: u32, fvk: &ExtendedFullViewingKey, tx_hash: &[u8], @@ -96,6 +102,8 @@ pub async fn decode_transaction( Memo::Arbitrary(_) => "Unrecognized".to_string(), }; let tx_info = TransactionInfo { + id_tx, + account, address, memo, amount, @@ -105,6 +113,17 @@ pub async fn decode_transaction( Ok(tx_info) } +struct DecodeTxParams<'a> { + tx: SyncSender, + client: CompactTxStreamerClient, + nf_map: &'a HashMap<(u32, Vec), u64>, + id_tx: u32, + account: u32, + fvk: ExtendedFullViewingKey, + tx_hash: Vec, + height: u32, +} + pub async fn retrieve_tx_info( client: &mut CompactTxStreamerClient, db_path: &str, @@ -119,6 +138,8 @@ pub async fn retrieve_tx_info( } let mut tx_ids_set: HashSet = HashSet::new(); let mut fvk_cache: HashMap = HashMap::new(); + let mut decode_tx_params: Vec = vec![]; + let (tx, rx) = mpsc::sync_channel::(4); for &id_tx in tx_ids.iter() { // need to keep tx order if tx_ids_set.contains(&id_tx) { @@ -126,20 +147,48 @@ pub async fn retrieve_tx_info( } tx_ids_set.insert(id_tx); let (account, height, tx_hash, ivk) = db.get_txhash(id_tx)?; - let fvk = fvk_cache.entry(account).or_insert_with(|| { + let fvk: &ExtendedFullViewingKey = fvk_cache.entry(account).or_insert_with(|| { decode_extended_full_viewing_key(NETWORK.hrp_sapling_extended_full_viewing_key(), &ivk) .unwrap() .unwrap() }); - let tx_info = decode_transaction(client, &nf_map, account, fvk, &tx_hash, height).await?; - if !tx_info.address.is_empty() && !tx_info.memo.is_empty() { - if let Some(contact) = decode_contact(&tx_info.address, &tx_info.memo)? { - db.store_contact(account, &contact)?; - } - } - db.store_tx_metadata(id_tx, &tx_info)?; + let params = DecodeTxParams { + tx: tx.clone(), + client: client.clone(), + nf_map: &nf_map, + id_tx, + account, + fvk: fvk.clone(), + tx_hash: tx_hash.clone(), + height, + }; + decode_tx_params.push(params); } - db.commit()?; + + let res = tokio_stream::iter(decode_tx_params).for_each_concurrent(None, |mut p| async move { + if let Ok(tx_info) = decode_transaction(&mut p.client, p.nf_map, p.id_tx, p.account, &p.fvk, &p.tx_hash, p.height).await { + p.tx.send(tx_info).unwrap(); + drop(p.tx); + } + }); + + let f = tokio::spawn(async move { + while let Ok(tx_info) = rx.recv() { + if !tx_info.address.is_empty() && !tx_info.memo.is_empty() { + if let Some(contact) = decode_contact(&tx_info.address, &tx_info.memo)? { + db.store_contact(tx_info.account, &contact)?; + } + } + db.store_tx_metadata(tx_info.id_tx, &tx_info)?; + } + db.commit()?; + + Ok::<_, anyhow::Error>(()) + }); + + res.await; + drop(tx); + f.await??; Ok(()) } @@ -185,7 +234,7 @@ mod tests { decode_extended_full_viewing_key(NETWORK.hrp_sapling_extended_full_viewing_key(), &fvk) .unwrap() .unwrap(); - let tx_info = decode_transaction(&mut client, &nf_map, account, &fvk, &tx_hash, 1313212) + let tx_info = decode_transaction(&mut client, &nf_map, 1, account, &fvk, &tx_hash, 1313212) .await .unwrap(); println!("{:?}", tx_info);