diff --git a/zebrad/src/commands/start/sync.rs b/zebrad/src/commands/start/sync.rs index 106add6fa..2c56e4451 100644 --- a/zebrad/src/commands/start/sync.rs +++ b/zebrad/src/commands/start/sync.rs @@ -83,7 +83,8 @@ where // ObtainTips Step 6 // - // If there are any prospective tips, call ExtendTips. Continue this step until there are no more prospective tips. + // If there are any prospective tips, call ExtendTips. + // Continue this step until there are no more prospective tips. while !self.prospective_tips.is_empty() { tracing::debug!("extending prospective tips"); @@ -110,11 +111,6 @@ where // making progress (probably using a timeout), then // continue the loop with a new invocation of // obtain_tips(), which will restart block downloads. - // this requires correctly constructing a block locator - // (TODO below) and ensuring that the verifier handles - // multiple requests for verification of the same block - // hash by handling both requests or by discarding the - // earlier request in favor of the later one. Err(e) => tracing::error!(?e, "potentially transient error"), }; } @@ -190,35 +186,29 @@ where // list, prune any block hashes already included in the // state, stopping at the first unknown hash to get resp1', // ..., respF'. (These lists may be empty). - let mut first_unknown = 0; + let mut first_unknown = None; for (i, &hash) in hashes.iter().enumerate() { - let depth = self - .state - .ready_and() - .await - .map_err(|e| eyre!(e))? - .call(zebra_state::Request::GetDepth { hash }) - .await - .map_err(|e| eyre!(e))?; - if let zs::Response::Depth(None) = depth { - first_unknown = i; + if !self.state_contains(hash).await? { + first_unknown = Some(i); break; } } + + // Hashes will be empty if we know about all the blocks in the response. + if first_unknown.is_none() { + tracing::debug!("ObtainTips: all hashes are known"); + continue; + } + let first_unknown = first_unknown.expect("already checked for None"); tracing::debug!( first_unknown, "found index of first unknown hash in response" ); - if first_unknown == hashes.len() { - // We should only stop getting hashes once we've finished the initial sync - tracing::debug!("no new hashes, even though we gave our tip?"); - continue; - } let unknown_hashes = &hashes[first_unknown..]; let new_tip = *unknown_hashes .last() - .expect("already checked first_unknown < hashes.len()"); + .expect("already checked that unknown hashes isn't empty"); // ObtainTips Step 4: // Combine the last elements of each list into a set; this is the @@ -297,23 +287,48 @@ where // It indicates that the remote peer does not have any blocks // following the prospective tip. match (hashes.first(), hashes.len()) { - (_, 0) => { - tracing::debug!("skipping empty response"); + (None, _) => { + tracing::debug!("ExtendTips: skipping empty response"); continue; } (_, 1) => { - tracing::debug!("skipping length-1 response, in case it's an unsolicited inv message"); + tracing::debug!("ExtendTips: skipping length-1 response, in case it's an unsolicited inv message"); continue; } (Some(hash), _) if (hash == &self.genesis_hash) => { - tracing::debug!("skipping response, peer could not extend the tip"); + tracing::debug!( + "ExtendTips: skipping response, peer could not extend the tip" + ); continue; } - _ => {} + (Some(&hash), _) => { + // Check for hashes we've already seen. + // This happens a lot near the end of the chain. + // This check reduces the number of duplicate + // blocks, but it is not required for + // correctness. + if self.state_contains(hash).await? { + tracing::debug!( + ?hash, + "ExtendTips: skipping response, peer returned a duplicate hash: already in state" + ); + continue; + } + } } let new_tip = hashes.pop().expect("expected: hashes must have len > 0"); + // Check for tips we've already seen + // TODO: remove this check once the sync service is more reliable + if self.state_contains(new_tip).await? { + tracing::debug!( + ?new_tip, + "ExtendTips: Unexpected duplicate tip from peer: already in state" + ); + continue; + } + // ExtendTips Step 4 // // Combine the last elements of the remaining responses into @@ -372,19 +387,7 @@ where // - the genesis hash is used as a placeholder for "no matches". // // So we just queue the genesis block here. - - let state_has_genesis = self - .state - .ready_and() - .await - .map_err(|e| eyre!(e))? - .call(zebra_state::Request::GetBlock { - hash: self.genesis_hash, - }) - .await - .is_ok(); - - if !state_has_genesis { + if !self.state_contains(self.genesis_hash).await? { self.request_blocks(vec![self.genesis_hash]).await?; } @@ -395,6 +398,14 @@ where async fn request_blocks(&mut self, hashes: Vec) -> Result<(), Report> { tracing::debug!(hashes.len = hashes.len(), "requesting blocks"); for hash in hashes.into_iter() { + // TODO: remove this check once the sync service is more reliable + if self.state_contains(hash).await? { + tracing::debug!( + ?hash, + "request_blocks: Unexpected duplicate hash: already in state" + ); + continue; + } // We construct the block download requests sequentially, waiting // for the peer set to be ready to process each request. This // ensures that we start block downloads in the order we want them @@ -433,6 +444,29 @@ where Ok(()) } + /// Returns `Ok(true)` if the hash is present in the state, and `Ok(false)` + /// if the hash is not present in the state. + /// + /// Returns `Err(_)` if an error occurs. + /// + /// TODO: handle multiple tips in the state. + #[instrument(skip(self))] + async fn state_contains(&mut self, hash: BlockHeaderHash) -> Result { + match self + .state + .ready_and() + .await + .map_err(|e| eyre!(e))? + .call(zebra_state::Request::GetDepth { hash }) + .await + .map_err(|e| eyre!(e))? + { + zs::Response::Depth(Some(_)) => Ok(true), + zs::Response::Depth(None) => Ok(false), + _ => unreachable!("wrong response to depth request"), + } + } + /// Update metrics gauges, and create a trace containing metrics. fn update_metrics(&self) { metrics::gauge!(