Concurrent get transaction

This commit is contained in:
Hanh 2021-07-31 15:34:57 +08:00
parent 7590bf179b
commit 18a52488fe
2 changed files with 60 additions and 10 deletions

View File

@ -28,6 +28,7 @@ flexi_logger = {version="0.17.1", features = ["compress"]}
serde = {version = "1.0.126", features = ["derive"]}
serde_json = "1.0.64"
tokio = { version = "^1.6", features = ["macros", "rt-multi-thread"] }
tokio-stream = "0.1.7"
protobuf = "2.23.0"
hex = "0.4.3"
futures = "0.3.15"

View File

@ -13,9 +13,14 @@ use zcash_primitives::sapling::note_encryption::{
};
use zcash_primitives::transaction::Transaction;
use zcash_primitives::zip32::ExtendedFullViewingKey;
use futures::StreamExt;
use std::sync::mpsc;
use std::sync::mpsc::SyncSender;
#[derive(Debug)]
pub struct TransactionInfo {
id_tx: u32,
account: u32,
pub address: String,
pub memo: String,
amount: i64,
@ -31,6 +36,7 @@ pub struct Contact {
pub async fn decode_transaction(
client: &mut CompactTxStreamerClient<Channel>,
nfs: &HashMap<(u32, Vec<u8>), u64>,
id_tx: u32,
account: u32,
fvk: &ExtendedFullViewingKey,
tx_hash: &[u8],
@ -96,6 +102,8 @@ pub async fn decode_transaction(
Memo::Arbitrary(_) => "Unrecognized".to_string(),
};
let tx_info = TransactionInfo {
id_tx,
account,
address,
memo,
amount,
@ -105,6 +113,17 @@ pub async fn decode_transaction(
Ok(tx_info)
}
struct DecodeTxParams<'a> {
tx: SyncSender<TransactionInfo>,
client: CompactTxStreamerClient<Channel>,
nf_map: &'a HashMap<(u32, Vec<u8>), u64>,
id_tx: u32,
account: u32,
fvk: ExtendedFullViewingKey,
tx_hash: Vec<u8>,
height: u32,
}
pub async fn retrieve_tx_info(
client: &mut CompactTxStreamerClient<Channel>,
db_path: &str,
@ -119,6 +138,8 @@ pub async fn retrieve_tx_info(
}
let mut tx_ids_set: HashSet<u32> = HashSet::new();
let mut fvk_cache: HashMap<u32, ExtendedFullViewingKey> = HashMap::new();
let mut decode_tx_params: Vec<DecodeTxParams> = vec![];
let (tx, rx) = mpsc::sync_channel::<TransactionInfo>(4);
for &id_tx in tx_ids.iter() {
// need to keep tx order
if tx_ids_set.contains(&id_tx) {
@ -126,20 +147,48 @@ pub async fn retrieve_tx_info(
}
tx_ids_set.insert(id_tx);
let (account, height, tx_hash, ivk) = db.get_txhash(id_tx)?;
let fvk = fvk_cache.entry(account).or_insert_with(|| {
let fvk: &ExtendedFullViewingKey = fvk_cache.entry(account).or_insert_with(|| {
decode_extended_full_viewing_key(NETWORK.hrp_sapling_extended_full_viewing_key(), &ivk)
.unwrap()
.unwrap()
});
let tx_info = decode_transaction(client, &nf_map, account, fvk, &tx_hash, height).await?;
if !tx_info.address.is_empty() && !tx_info.memo.is_empty() {
if let Some(contact) = decode_contact(&tx_info.address, &tx_info.memo)? {
db.store_contact(account, &contact)?;
}
}
db.store_tx_metadata(id_tx, &tx_info)?;
let params = DecodeTxParams {
tx: tx.clone(),
client: client.clone(),
nf_map: &nf_map,
id_tx,
account,
fvk: fvk.clone(),
tx_hash: tx_hash.clone(),
height,
};
decode_tx_params.push(params);
}
db.commit()?;
let res = tokio_stream::iter(decode_tx_params).for_each_concurrent(None, |mut p| async move {
if let Ok(tx_info) = decode_transaction(&mut p.client, p.nf_map, p.id_tx, p.account, &p.fvk, &p.tx_hash, p.height).await {
p.tx.send(tx_info).unwrap();
drop(p.tx);
}
});
let f = tokio::spawn(async move {
while let Ok(tx_info) = rx.recv() {
if !tx_info.address.is_empty() && !tx_info.memo.is_empty() {
if let Some(contact) = decode_contact(&tx_info.address, &tx_info.memo)? {
db.store_contact(tx_info.account, &contact)?;
}
}
db.store_tx_metadata(tx_info.id_tx, &tx_info)?;
}
db.commit()?;
Ok::<_, anyhow::Error>(())
});
res.await;
drop(tx);
f.await??;
Ok(())
}
@ -185,7 +234,7 @@ mod tests {
decode_extended_full_viewing_key(NETWORK.hrp_sapling_extended_full_viewing_key(), &fvk)
.unwrap()
.unwrap();
let tx_info = decode_transaction(&mut client, &nf_map, account, &fvk, &tx_hash, 1313212)
let tx_info = decode_transaction(&mut client, &nf_map, 1, account, &fvk, &tx_hash, 1313212)
.await
.unwrap();
println!("{:?}", tx_info);