consensus: avoid tower::ServiceExt::oneshot()

This is a really nice function but there might be a bug in its future
implementation: https://github.com/tower-rs/tower/issues/469

This bug may have already been fixed for the 0.4.0 release, so we could change
back then.
This commit is contained in:
Henry de Valence 2020-09-10 10:34:59 -07:00
parent 006596b9a9
commit 0d6303a56f
2 changed files with 20 additions and 20 deletions

View File

@ -134,12 +134,17 @@ where
metrics::counter!("block.verified.block.count", 1); metrics::counter!("block.verified.block.count", 1);
// Finally, submit the block for contextual verification. // Finally, submit the block for contextual verification.
match state_service.oneshot(zs::Request::CommitBlock{ block }).await? { match state_service
.ready_and()
.await?
.call(zs::Request::CommitBlock { block })
.await?
{
zs::Response::Committed(committed_hash) => { zs::Response::Committed(committed_hash) => {
assert_eq!(committed_hash, hash, "state returned wrong hash: hashes must be equal"); assert_eq!(committed_hash, hash, "state must commit correct hash");
Ok(hash) Ok(hash)
} }
_ => unreachable!("wrong response to CommitBlock"), _ => unreachable!("wrong response for CommitBlock"),
} }
} }
.boxed() .boxed()

View File

@ -770,8 +770,6 @@ where
} }
fn call(&mut self, block: Arc<Block>) -> Self::Future { fn call(&mut self, block: Arc<Block>) -> Self::Future {
let state_service = self.state_service.clone();
// Queue the block for verification, until we receive all the blocks for // Queue the block for verification, until we receive all the blocks for
// the current checkpoint range. // the current checkpoint range.
let rx = self.queue_block(block.clone()); let rx = self.queue_block(block.clone());
@ -789,26 +787,23 @@ where
metrics::gauge!("checkpoint.queued_slots", self.queued.len() as i64); metrics::gauge!("checkpoint.queued_slots", self.queued.len() as i64);
let mut state_service = self.state_service.clone();
async move { async move {
match rx let hash = rx
.await .await
.expect("CheckpointVerifier does not leave dangling receivers") .expect("CheckpointVerifier does not leave dangling receivers")?;
match state_service
.ready_and()
.await?
.call(zs::Request::CommitFinalizedBlock { block })
.await?
{ {
Ok(hash) => { zs::Response::Committed(committed_hash) => {
let verified_hash = match state_service assert_eq!(committed_hash, hash, "state must commit correct hash");
.oneshot(zs::Request::CommitFinalizedBlock { block })
.await?
{
zs::Response::Committed(hash) => hash,
_ => unreachable!("wrong response for CommitFinalizedBlock"),
};
assert_eq!(
verified_hash, hash,
"state service returned wrong hash: hashes must be equal"
);
Ok(hash) Ok(hash)
} }
Err(e) => Err(e), _ => unreachable!("wrong response for CommitFinalizedBlock"),
} }
} }
.boxed() .boxed()