Make sync ignore known hashes (#853)

* fix: Handle known ObtainTips correctly

enumerate never returns a value beyond the end of the vector.

* fix: Ignore known tips in ExtendTips

Some peers send us known tips when we try to extend.

* fix: Ignore known hashes when downloading

Despite all our other checks, we still end up downloading some hashes
multiple times.

* fix: Increase the number of retries

The old sync code relied on duplicate block fetches to make progress,
but the last few commits have removed some of those duplicates.

Instead, just retry the fetches that fail.

* fix: Tweak comments

Co-authored-by: Jane Lusby <jlusby42@gmail.com>

* fix: Cleanup the state_contains interface in Sync

* Fix brackets

Oops

Co-authored-by: Jane Lusby <jlusby42@gmail.com>
This commit is contained in:
teor 2020-08-11 09:17:50 +10:00 committed by GitHub
parent c9093e4d59
commit 2550c44d48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 75 additions and 41 deletions

View File

@ -83,7 +83,8 @@ where
// ObtainTips Step 6
//
// If there are any prospective tips, call ExtendTips. Continue this step until there are no more prospective tips.
// If there are any prospective tips, call ExtendTips.
// Continue this step until there are no more prospective tips.
while !self.prospective_tips.is_empty() {
tracing::debug!("extending prospective tips");
@ -110,11 +111,6 @@ where
// making progress (probably using a timeout), then
// continue the loop with a new invocation of
// obtain_tips(), which will restart block downloads.
// this requires correctly constructing a block locator
// (TODO below) and ensuring that the verifier handles
// multiple requests for verification of the same block
// hash by handling both requests or by discarding the
// earlier request in favor of the later one.
Err(e) => tracing::error!(?e, "potentially transient error"),
};
}
@ -190,35 +186,29 @@ where
// list, prune any block hashes already included in the
// state, stopping at the first unknown hash to get resp1',
// ..., respF'. (These lists may be empty).
let mut first_unknown = 0;
let mut first_unknown = None;
for (i, &hash) in hashes.iter().enumerate() {
let depth = self
.state
.ready_and()
.await
.map_err(|e| eyre!(e))?
.call(zebra_state::Request::GetDepth { hash })
.await
.map_err(|e| eyre!(e))?;
if let zs::Response::Depth(None) = depth {
first_unknown = i;
if !self.state_contains(hash).await? {
first_unknown = Some(i);
break;
}
}
// Hashes will be empty if we know about all the blocks in the response.
if first_unknown.is_none() {
tracing::debug!("ObtainTips: all hashes are known");
continue;
}
let first_unknown = first_unknown.expect("already checked for None");
tracing::debug!(
first_unknown,
"found index of first unknown hash in response"
);
if first_unknown == hashes.len() {
// We should only stop getting hashes once we've finished the initial sync
tracing::debug!("no new hashes, even though we gave our tip?");
continue;
}
let unknown_hashes = &hashes[first_unknown..];
let new_tip = *unknown_hashes
.last()
.expect("already checked first_unknown < hashes.len()");
.expect("already checked that unknown hashes isn't empty");
// ObtainTips Step 4:
// Combine the last elements of each list into a set; this is the
@ -297,23 +287,48 @@ where
// It indicates that the remote peer does not have any blocks
// following the prospective tip.
match (hashes.first(), hashes.len()) {
(_, 0) => {
tracing::debug!("skipping empty response");
(None, _) => {
tracing::debug!("ExtendTips: skipping empty response");
continue;
}
(_, 1) => {
tracing::debug!("skipping length-1 response, in case it's an unsolicited inv message");
tracing::debug!("ExtendTips: skipping length-1 response, in case it's an unsolicited inv message");
continue;
}
(Some(hash), _) if (hash == &self.genesis_hash) => {
tracing::debug!("skipping response, peer could not extend the tip");
tracing::debug!(
"ExtendTips: skipping response, peer could not extend the tip"
);
continue;
}
_ => {}
(Some(&hash), _) => {
// Check for hashes we've already seen.
// This happens a lot near the end of the chain.
// This check reduces the number of duplicate
// blocks, but it is not required for
// correctness.
if self.state_contains(hash).await? {
tracing::debug!(
?hash,
"ExtendTips: skipping response, peer returned a duplicate hash: already in state"
);
continue;
}
}
}
let new_tip = hashes.pop().expect("expected: hashes must have len > 0");
// Check for tips we've already seen
// TODO: remove this check once the sync service is more reliable
if self.state_contains(new_tip).await? {
tracing::debug!(
?new_tip,
"ExtendTips: Unexpected duplicate tip from peer: already in state"
);
continue;
}
// ExtendTips Step 4
//
// Combine the last elements of the remaining responses into
@ -372,19 +387,7 @@ where
// - the genesis hash is used as a placeholder for "no matches".
//
// So we just queue the genesis block here.
let state_has_genesis = self
.state
.ready_and()
.await
.map_err(|e| eyre!(e))?
.call(zebra_state::Request::GetBlock {
hash: self.genesis_hash,
})
.await
.is_ok();
if !state_has_genesis {
if !self.state_contains(self.genesis_hash).await? {
self.request_blocks(vec![self.genesis_hash]).await?;
}
@ -395,6 +398,14 @@ where
async fn request_blocks(&mut self, hashes: Vec<BlockHeaderHash>) -> Result<(), Report> {
tracing::debug!(hashes.len = hashes.len(), "requesting blocks");
for hash in hashes.into_iter() {
// TODO: remove this check once the sync service is more reliable
if self.state_contains(hash).await? {
tracing::debug!(
?hash,
"request_blocks: Unexpected duplicate hash: already in state"
);
continue;
}
// We construct the block download requests sequentially, waiting
// for the peer set to be ready to process each request. This
// ensures that we start block downloads in the order we want them
@ -433,6 +444,29 @@ where
Ok(())
}
/// Returns `Ok(true)` if the hash is present in the state, and `Ok(false)`
/// if the hash is not present in the state.
///
/// Returns `Err(_)` if an error occurs.
///
/// TODO: handle multiple tips in the state.
#[instrument(skip(self))]
async fn state_contains(&mut self, hash: BlockHeaderHash) -> Result<bool, Report> {
match self
.state
.ready_and()
.await
.map_err(|e| eyre!(e))?
.call(zebra_state::Request::GetDepth { hash })
.await
.map_err(|e| eyre!(e))?
{
zs::Response::Depth(Some(_)) => Ok(true),
zs::Response::Depth(None) => Ok(false),
_ => unreachable!("wrong response to depth request"),
}
}
/// Update metrics gauges, and create a trace containing metrics.
fn update_metrics(&self) {
metrics::gauge!(