Handle out of order responses

This commit is contained in:
Hanh 2022-07-12 15:28:58 +08:00
parent de3fc5a189
commit 7dfd64502c
4 changed files with 47 additions and 53 deletions

View File

@ -2,7 +2,7 @@
use crate::coinconfig::CoinConfig;
use crate::scan::AMProgressCallback;
use crate::{BlockId, CTree, CompactTxStreamerClient};
use crate::{BlockId, CTree, CompactTxStreamerClient, DbAdapter};
use std::sync::Arc;
use tokio::sync::Mutex;
use tonic::transport::Channel;
@ -88,8 +88,8 @@ async fn fetch_and_store_tree_state(
.await?
.into_inner();
let tree = CTree::read(&*hex::decode(&tree_state.sapling_tree)?)?;
c.db()?
.store_block(height, &block.hash, block.time, &tree)?;
let db = c.db()?;
DbAdapter::store_block(&db.connection, height, &block.hash, block.time, &tree)?;
Ok(())
}

View File

@ -96,53 +96,47 @@ pub async fn download_chain(
client: &mut CompactTxStreamerClient<Channel>,
start_height: u32,
end_height: u32,
chunk_size: u32,
mut prev_hash: Option<[u8; 32]>,
blocks_tx: Sender<Blocks>,
) -> anyhow::Result<()> {
let mut output_count = 0;
let mut cbs: Vec<CompactBlock> = Vec::new();
let mut s = start_height + 1;
while s <= end_height {
let e = (s + chunk_size - 1).min(end_height);
let range = BlockRange {
start: Some(BlockId {
height: s as u64,
hash: vec![],
}),
end: Some(BlockId {
height: e as u64,
hash: vec![],
}),
};
let mut block_stream = client
.get_block_range(Request::new(range))
.await?
.into_inner();
while let Some(mut block) = block_stream.message().await? {
if prev_hash.is_some() && block.prev_hash.as_slice() != prev_hash.unwrap() {
anyhow::bail!(ChainError::Reorg);
}
let mut ph = [0u8; 32];
ph.copy_from_slice(&block.hash);
prev_hash = Some(ph);
for b in block.vtx.iter_mut() {
b.actions.clear(); // don't need Orchard actions
}
let block_output_count: usize = block.vtx.iter().map(|tx| tx.outputs.len()).sum();
if output_count + block_output_count > MAX_OUTPUTS_PER_CHUNK {
// output
let out = cbs;
cbs = Vec::new();
blocks_tx.send(Blocks(out)).await.unwrap();
output_count = 0;
}
cbs.push(block);
output_count += block_output_count;
let range = BlockRange {
start: Some(BlockId {
height: (start_height + 1) as u64,
hash: vec![],
}),
end: Some(BlockId {
height: end_height as u64,
hash: vec![],
}),
};
let mut block_stream = client
.get_block_range(Request::new(range))
.await?
.into_inner();
while let Some(mut block) = block_stream.message().await? {
if prev_hash.is_some() && block.prev_hash.as_slice() != prev_hash.unwrap() {
anyhow::bail!(ChainError::Reorg);
}
s = e + 1;
let mut ph = [0u8; 32];
ph.copy_from_slice(&block.hash);
prev_hash = Some(ph);
for b in block.vtx.iter_mut() {
b.actions.clear(); // don't need Orchard actions
}
let block_output_count: usize = block.vtx.iter().map(|tx| tx.outputs.len()).sum();
if output_count + block_output_count > MAX_OUTPUTS_PER_CHUNK {
// output
let out = cbs;
cbs = Vec::new();
blocks_tx.send(Blocks(out)).await.unwrap();
output_count = 0;
}
cbs.push(block);
output_count += block_output_count;
}
let _ = blocks_tx.send(Blocks(cbs)).await;
Ok(())

View File

@ -215,7 +215,7 @@ impl DbAdapter {
}
pub fn store_block(
&self,
connection: &Connection,
height: u32,
hash: &[u8],
timestamp: u32,
@ -224,7 +224,7 @@ impl DbAdapter {
log::debug!("+block");
let mut bb: Vec<u8> = vec![];
tree.write(&mut bb)?;
self.connection.execute(
connection.execute(
"INSERT INTO blocks(height, hash, timestamp, sapling_tree)
VALUES (?1, ?2, ?3, ?4)
ON CONFLICT DO NOTHING",
@ -278,7 +278,7 @@ impl DbAdapter {
}
pub fn store_witnesses(
&self,
connection: &Connection,
witness: &Witness,
height: u32,
id_note: u32,
@ -286,7 +286,7 @@ impl DbAdapter {
log::debug!("+witnesses");
let mut bb: Vec<u8> = vec![];
witness.write(&mut bb)?;
self.connection.execute(
connection.execute(
"INSERT INTO sapling_witnesses(note, height, witness) VALUES (?1, ?2, ?3)
ON CONFLICT DO NOTHING",
params![id_note, height, bb],
@ -1072,7 +1072,7 @@ mod tests {
cursor: CTree::new(),
};
db_tx.commit().unwrap();
db.store_witnesses(&witness, 1000, 1).unwrap();
Db::store_witnesses(&witness, 1000, 1).unwrap();
}
#[test]

View File

@ -63,7 +63,6 @@ pub async fn scan_all(network: &Network, fvks: &[ExtendedFullViewingKey]) -> any
&mut client,
start_height,
end_height,
100_000,
None,
blocks_tx,
)
@ -137,7 +136,6 @@ pub async fn sync_async(
&mut client,
start_height,
end_height,
chunk_size,
prev_hash,
processor_tx,
)
@ -323,11 +321,13 @@ pub async fn sync_async(
witnesses = new_witnesses;
if let Some(block) = blocks.0.last() {
let mut db_transaction = db.begin_transaction()?;
let height = block.height as u32;
for w in witnesses.iter() {
db.store_witnesses(w, height, w.id_note)?;
DbAdapter::store_witnesses(&db_transaction, w, height, w.id_note)?;
}
db.store_block(height, &block.hash, block.time, &tree)?;
DbAdapter::store_block(&mut db_transaction, height, &block.hash, block.time, &tree)?;
db_transaction.commit()?;
}
}