cleanup(state): condenses match arms for redirected StateService requests and moves metrics counting to Request methods (#5137)

* Move AwaitUtxos next to the other shared writeable state requests

* Rename ReadResponse::Utxos to ReadResponse::AddressUtxos

```sh
fastmod Utxos AddressUtxos zebra*
```

* Rename an out_point variable to outpoint for consistency

* Rename transparent_utxos to address_utxos

```sh
fastmod transparent_utxos address_utxos zebra*
```

* Run AwaitUtxo without accessing shared mutable chain state

* Fix some incorrect comments

* Explain why some concurrent reads are ok

* Add a TODO

* Stop using self.mem in AwaitUtxo requests

* Update state service module documentation

* Move the QueuedBlock type into the queued_blocks module

* Explain how spent UTXOs are treated by the state

* Clarify how cached Chains impact state read requests

And move repeated comments to the module header.

* fastmod ChainUtxo BestChainUtxo zebra*

* Add an AnyChainUtxo request

* Make AwaitUtxo non-blocking

* Move the finalized block queue into the StateService

* Move the queued_blocks module to the state service

* Move QueuedFinalized into queued_blocks

* Move the queued_blocks tests into their own module

* Make the FinalizedState cloneable

* cleanup of repetitive code

* fixes merge by adding back req.count_metric and removing the metrics::counter in AwaitUtxo

Co-authored-by: teor <teor@riseup.net>
This commit is contained in:
Arya 2022-09-16 15:03:56 -04:00 committed by GitHub
parent 64d984340d
commit 2fa1879fb1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 69 additions and 279 deletions

View File

@ -511,6 +511,33 @@ pub enum Request {
}, },
} }
impl Request {
fn variant_name(&self) -> &'static str {
match self {
Request::CommitBlock(_) => "commit_block",
Request::CommitFinalizedBlock(_) => "commit_finalized_block",
Request::AwaitUtxo(_) => "await_utxo",
Request::Depth(_) => "depth",
Request::Tip => "tip",
Request::BlockLocator => "block_locator",
Request::Transaction(_) => "transaction",
Request::Block(_) => "block",
Request::FindBlockHashes { .. } => "find_block_hashes",
Request::FindBlockHeaders { .. } => "find_block_headers",
}
}
/// Counts metric for StateService call
pub fn count_metric(&self) {
metrics::counter!(
"state.requests",
1,
"service" => "state",
"type" => self.variant_name()
);
}
}
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
/// A read-only query about the chain state, via the /// A read-only query about the chain state, via the
/// [`ReadStateService`](crate::service::ReadStateService). /// [`ReadStateService`](crate::service::ReadStateService).
@ -670,6 +697,37 @@ pub enum ReadRequest {
UtxosByAddresses(HashSet<transparent::Address>), UtxosByAddresses(HashSet<transparent::Address>),
} }
impl ReadRequest {
fn variant_name(&self) -> &'static str {
match self {
ReadRequest::Tip => "tip",
ReadRequest::Depth(_) => "depth",
ReadRequest::Block(_) => "block",
ReadRequest::Transaction(_) => "transaction",
ReadRequest::BestChainUtxo { .. } => "best_chain_utxo",
ReadRequest::AnyChainUtxo { .. } => "any_chain_utxo",
ReadRequest::BlockLocator => "block_locator",
ReadRequest::FindBlockHashes { .. } => "find_block_hashes",
ReadRequest::FindBlockHeaders { .. } => "find_block_headers",
ReadRequest::SaplingTree { .. } => "sapling_tree",
ReadRequest::OrchardTree { .. } => "orchard_tree",
ReadRequest::AddressBalance { .. } => "address_balance",
ReadRequest::TransactionIdsByAddresses { .. } => "transaction_ids_by_addesses",
ReadRequest::UtxosByAddresses(_) => "utxos_by_addesses",
}
}
/// Counts metric for ReadStateService call
pub fn count_metric(&self) {
metrics::counter!(
"state.requests",
1,
"service" => "read_state",
"type" => self.variant_name()
);
}
}
/// Conversion from read-write [`Request`]s to read-only [`ReadRequest`]s. /// Conversion from read-write [`Request`]s to read-only [`ReadRequest`]s.
/// ///
/// Used to dispatch read requests concurrently from the [`StateService`](crate::service::StateService). /// Used to dispatch read requests concurrently from the [`StateService`](crate::service::StateService).

View File

@ -647,17 +647,12 @@ impl Service<Request> for StateService {
#[instrument(name = "state", skip(self, req))] #[instrument(name = "state", skip(self, req))]
fn call(&mut self, req: Request) -> Self::Future { fn call(&mut self, req: Request) -> Self::Future {
req.count_metric();
match req { match req {
// Uses queued_non_finalized_blocks and pending_utxos in the StateService // Uses queued_non_finalized_blocks and pending_utxos in the StateService
// Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb. // Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb.
Request::CommitBlock(prepared) => { Request::CommitBlock(prepared) => {
metrics::counter!(
"state.requests",
1,
"service" => "state",
"type" => "commit_block",
);
let timer = CodeTimer::start(); let timer = CodeTimer::start();
self.assert_block_can_be_validated(&prepared); self.assert_block_can_be_validated(&prepared);
@ -703,13 +698,6 @@ impl Service<Request> for StateService {
// Uses queued_finalized_blocks and pending_utxos in the StateService. // Uses queued_finalized_blocks and pending_utxos in the StateService.
// Accesses shared writeable state in the StateService and ZebraDb. // Accesses shared writeable state in the StateService and ZebraDb.
Request::CommitFinalizedBlock(finalized) => { Request::CommitFinalizedBlock(finalized) => {
metrics::counter!(
"state.requests",
1,
"service" => "state",
"type" => "commit_finalized_block",
);
let timer = CodeTimer::start(); let timer = CodeTimer::start();
// # Consensus // # Consensus
@ -758,13 +746,6 @@ impl Service<Request> for StateService {
// Uses pending_utxos and queued_non_finalized_blocks in the StateService. // Uses pending_utxos and queued_non_finalized_blocks in the StateService.
// If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService. // If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService.
Request::AwaitUtxo(outpoint) => { Request::AwaitUtxo(outpoint) => {
metrics::counter!(
"state.requests",
1,
"service" => "state",
"type" => "await_utxo",
);
let timer = CodeTimer::start(); let timer = CodeTimer::start();
// Prepare the AwaitUtxo future from PendingUxtos. // Prepare the AwaitUtxo future from PendingUxtos.
@ -828,167 +809,14 @@ impl Service<Request> for StateService {
.boxed() .boxed()
} }
// TODO: add a name() method to Request, and combine all the generic read requests
//
// Runs concurrently using the ReadStateService // Runs concurrently using the ReadStateService
Request::Depth(_) => { Request::Depth(_)
metrics::counter!( | Request::Tip
"state.requests", | Request::BlockLocator
1, | Request::Transaction(_)
"service" => "state", | Request::Block(_)
"type" => "depth", | Request::FindBlockHashes { .. }
); | Request::FindBlockHeaders { .. } => {
// Redirect the request to the concurrent ReadStateService
let read_service = self.read_service.clone();
async move {
let req = req
.try_into()
.expect("ReadRequest conversion should not fail");
let rsp = read_service.oneshot(req).await?;
let rsp = rsp.try_into().expect("Response conversion should not fail");
Ok(rsp)
}
.boxed()
}
// Runs concurrently using the ReadStateService
Request::Tip => {
metrics::counter!(
"state.requests",
1,
"service" => "state",
"type" => "tip",
);
// Redirect the request to the concurrent ReadStateService
let read_service = self.read_service.clone();
async move {
let req = req
.try_into()
.expect("ReadRequest conversion should not fail");
let rsp = read_service.oneshot(req).await?;
let rsp = rsp.try_into().expect("Response conversion should not fail");
Ok(rsp)
}
.boxed()
}
// Runs concurrently using the ReadStateService
Request::BlockLocator => {
metrics::counter!(
"state.requests",
1,
"service" => "state",
"type" => "block_locator",
);
// Redirect the request to the concurrent ReadStateService
let read_service = self.read_service.clone();
async move {
let req = req
.try_into()
.expect("ReadRequest conversion should not fail");
let rsp = read_service.oneshot(req).await?;
let rsp = rsp.try_into().expect("Response conversion should not fail");
Ok(rsp)
}
.boxed()
}
// Runs concurrently using the ReadStateService
Request::Transaction(_) => {
metrics::counter!(
"state.requests",
1,
"service" => "state",
"type" => "transaction",
);
// Redirect the request to the concurrent ReadStateService
let read_service = self.read_service.clone();
async move {
let req = req
.try_into()
.expect("ReadRequest conversion should not fail");
let rsp = read_service.oneshot(req).await?;
let rsp = rsp.try_into().expect("Response conversion should not fail");
Ok(rsp)
}
.boxed()
}
// Runs concurrently using the ReadStateService
Request::Block(_) => {
metrics::counter!(
"state.requests",
1,
"service" => "state",
"type" => "block",
);
// Redirect the request to the concurrent ReadStateService
let read_service = self.read_service.clone();
async move {
let req = req
.try_into()
.expect("ReadRequest conversion should not fail");
let rsp = read_service.oneshot(req).await?;
let rsp = rsp.try_into().expect("Response conversion should not fail");
Ok(rsp)
}
.boxed()
}
// Runs concurrently using the ReadStateService
Request::FindBlockHashes { .. } => {
metrics::counter!(
"state.requests",
1,
"service" => "state",
"type" => "find_block_hashes",
);
// Redirect the request to the concurrent ReadStateService
let read_service = self.read_service.clone();
async move {
let req = req
.try_into()
.expect("ReadRequest conversion should not fail");
let rsp = read_service.oneshot(req).await?;
let rsp = rsp.try_into().expect("Response conversion should not fail");
Ok(rsp)
}
.boxed()
}
// Runs concurrently using the ReadStateService
Request::FindBlockHeaders { .. } => {
metrics::counter!(
"state.requests",
1,
"service" => "state",
"type" => "find_block_headers",
);
// Redirect the request to the concurrent ReadStateService // Redirect the request to the concurrent ReadStateService
let read_service = self.read_service.clone(); let read_service = self.read_service.clone();
@ -1020,16 +848,11 @@ impl Service<ReadRequest> for ReadStateService {
#[instrument(name = "read_state", skip(self))] #[instrument(name = "read_state", skip(self))]
fn call(&mut self, req: ReadRequest) -> Self::Future { fn call(&mut self, req: ReadRequest) -> Self::Future {
req.count_metric();
match req { match req {
// Used by the StateService. // Used by the StateService.
ReadRequest::Tip => { ReadRequest::Tip => {
metrics::counter!(
"state.requests",
1,
"service" => "read_state",
"type" => "tip",
);
let timer = CodeTimer::start(); let timer = CodeTimer::start();
let state = self.clone(); let state = self.clone();
@ -1055,13 +878,6 @@ impl Service<ReadRequest> for ReadStateService {
// Used by the StateService. // Used by the StateService.
ReadRequest::Depth(hash) => { ReadRequest::Depth(hash) => {
metrics::counter!(
"state.requests",
1,
"service" => "read_state",
"type" => "depth",
);
let timer = CodeTimer::start(); let timer = CodeTimer::start();
let state = self.clone(); let state = self.clone();
@ -1087,13 +903,6 @@ impl Service<ReadRequest> for ReadStateService {
// Used by get_block RPC and the StateService. // Used by get_block RPC and the StateService.
ReadRequest::Block(hash_or_height) => { ReadRequest::Block(hash_or_height) => {
metrics::counter!(
"state.requests",
1,
"service" => "read_state",
"type" => "block",
);
let timer = CodeTimer::start(); let timer = CodeTimer::start();
let state = self.clone(); let state = self.clone();
@ -1123,13 +932,6 @@ impl Service<ReadRequest> for ReadStateService {
// For the get_raw_transaction RPC and the StateService. // For the get_raw_transaction RPC and the StateService.
ReadRequest::Transaction(hash) => { ReadRequest::Transaction(hash) => {
metrics::counter!(
"state.requests",
1,
"service" => "read_state",
"type" => "transaction",
);
let timer = CodeTimer::start(); let timer = CodeTimer::start();
let state = self.clone(); let state = self.clone();
@ -1155,13 +957,6 @@ impl Service<ReadRequest> for ReadStateService {
// Currently unused. // Currently unused.
ReadRequest::BestChainUtxo(outpoint) => { ReadRequest::BestChainUtxo(outpoint) => {
metrics::counter!(
"state.requests",
1,
"service" => "read_state",
"type" => "best_chain_utxo",
);
let timer = CodeTimer::start(); let timer = CodeTimer::start();
let state = self.clone(); let state = self.clone();
@ -1187,13 +982,6 @@ impl Service<ReadRequest> for ReadStateService {
// Manually used by the StateService to implement part of AwaitUtxo. // Manually used by the StateService to implement part of AwaitUtxo.
ReadRequest::AnyChainUtxo(outpoint) => { ReadRequest::AnyChainUtxo(outpoint) => {
metrics::counter!(
"state.requests",
1,
"service" => "read_state",
"type" => "any_chain_utxo",
);
let timer = CodeTimer::start(); let timer = CodeTimer::start();
let state = self.clone(); let state = self.clone();
@ -1219,13 +1007,6 @@ impl Service<ReadRequest> for ReadStateService {
// Used by the StateService. // Used by the StateService.
ReadRequest::BlockLocator => { ReadRequest::BlockLocator => {
metrics::counter!(
"state.requests",
1,
"service" => "read_state",
"type" => "block_locator",
);
let timer = CodeTimer::start(); let timer = CodeTimer::start();
let state = self.clone(); let state = self.clone();
@ -1253,13 +1034,6 @@ impl Service<ReadRequest> for ReadStateService {
// Used by the StateService. // Used by the StateService.
ReadRequest::FindBlockHashes { known_blocks, stop } => { ReadRequest::FindBlockHashes { known_blocks, stop } => {
metrics::counter!(
"state.requests",
1,
"service" => "read_state",
"type" => "find_block_hashes",
);
let timer = CodeTimer::start(); let timer = CodeTimer::start();
let state = self.clone(); let state = self.clone();
@ -1291,13 +1065,6 @@ impl Service<ReadRequest> for ReadStateService {
// Used by the StateService. // Used by the StateService.
ReadRequest::FindBlockHeaders { known_blocks, stop } => { ReadRequest::FindBlockHeaders { known_blocks, stop } => {
metrics::counter!(
"state.requests",
1,
"service" => "read_state",
"type" => "find_block_headers",
);
let timer = CodeTimer::start(); let timer = CodeTimer::start();
let state = self.clone(); let state = self.clone();
@ -1333,13 +1100,6 @@ impl Service<ReadRequest> for ReadStateService {
} }
ReadRequest::SaplingTree(hash_or_height) => { ReadRequest::SaplingTree(hash_or_height) => {
metrics::counter!(
"state.requests",
1,
"service" => "read_state",
"type" => "sapling_tree",
);
let timer = CodeTimer::start(); let timer = CodeTimer::start();
let state = self.clone(); let state = self.clone();
@ -1368,13 +1128,6 @@ impl Service<ReadRequest> for ReadStateService {
} }
ReadRequest::OrchardTree(hash_or_height) => { ReadRequest::OrchardTree(hash_or_height) => {
metrics::counter!(
"state.requests",
1,
"service" => "read_state",
"type" => "orchard_tree",
);
let timer = CodeTimer::start(); let timer = CodeTimer::start();
let state = self.clone(); let state = self.clone();
@ -1404,13 +1157,6 @@ impl Service<ReadRequest> for ReadStateService {
// For the get_address_balance RPC. // For the get_address_balance RPC.
ReadRequest::AddressBalance(addresses) => { ReadRequest::AddressBalance(addresses) => {
metrics::counter!(
"state.requests",
1,
"service" => "read_state",
"type" => "address_balance",
);
let timer = CodeTimer::start(); let timer = CodeTimer::start();
let state = self.clone(); let state = self.clone();
@ -1443,13 +1189,6 @@ impl Service<ReadRequest> for ReadStateService {
addresses, addresses,
height_range, height_range,
} => { } => {
metrics::counter!(
"state.requests",
1,
"service" => "read_state",
"type" => "transaction_ids_by_addresses",
);
let timer = CodeTimer::start(); let timer = CodeTimer::start();
let state = self.clone(); let state = self.clone();
@ -1486,13 +1225,6 @@ impl Service<ReadRequest> for ReadStateService {
// For the get_address_utxos RPC. // For the get_address_utxos RPC.
ReadRequest::UtxosByAddresses(addresses) => { ReadRequest::UtxosByAddresses(addresses) => {
metrics::counter!(
"state.requests",
1,
"service" => "read_state",
"type" => "utxos_by_addresses",
);
let timer = CodeTimer::start(); let timer = CodeTimer::start();
let state = self.clone(); let state = self.clone();