add(scan): Implement `Results` request (#8224)
* implement Results service call * call `sapling_results_for_key` from a blocking thread Co-authored-by: Arya <aryasolhi@gmail.com> --------- Co-authored-by: Arya <aryasolhi@gmail.com>
This commit is contained in:
parent
5feb40e0b8
commit
052f235ba3
|
@ -20,8 +20,8 @@ pub enum Request {
|
||||||
/// Deletes viewing keys and their results from the database.
|
/// Deletes viewing keys and their results from the database.
|
||||||
DeleteKeys(Vec<String>),
|
DeleteKeys(Vec<String>),
|
||||||
|
|
||||||
/// TODO: Accept `KeyHash`es and return `Transaction`s
|
/// Accept keys and return transaction data
|
||||||
Results(Vec<()>),
|
Results(Vec<String>),
|
||||||
|
|
||||||
/// TODO: Accept `KeyHash`es and return a channel receiver
|
/// TODO: Accept `KeyHash`es and return a channel receiver
|
||||||
SubscribeResults(Vec<()>),
|
SubscribeResults(Vec<()>),
|
||||||
|
|
|
@ -1,8 +1,11 @@
|
||||||
//! `zebra_scan::service::ScanService` response types.
|
//! `zebra_scan::service::ScanService` response types.
|
||||||
|
|
||||||
use std::sync::{mpsc, Arc};
|
use std::{
|
||||||
|
collections::BTreeMap,
|
||||||
|
sync::{mpsc, Arc},
|
||||||
|
};
|
||||||
|
|
||||||
use zebra_chain::{block::Height, transaction::Transaction};
|
use zebra_chain::{block::Height, transaction::Hash};
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
/// Response types for `zebra_scan::service::ScanService`
|
/// Response types for `zebra_scan::service::ScanService`
|
||||||
|
@ -14,7 +17,9 @@ pub enum Response {
|
||||||
},
|
},
|
||||||
|
|
||||||
/// Response to Results request
|
/// Response to Results request
|
||||||
Results(Vec<Transaction>),
|
///
|
||||||
|
/// We use the nested `BTreeMap` so we don't repeat any piece of response data.
|
||||||
|
Results(BTreeMap<String, BTreeMap<Height, Vec<Hash>>>),
|
||||||
|
|
||||||
/// Response to DeleteKeys request
|
/// Response to DeleteKeys request
|
||||||
DeletedKeys,
|
DeletedKeys,
|
||||||
|
@ -23,5 +28,5 @@ pub enum Response {
|
||||||
ClearedResults,
|
ClearedResults,
|
||||||
|
|
||||||
/// Response to SubscribeResults request
|
/// Response to SubscribeResults request
|
||||||
SubscribeResults(mpsc::Receiver<Arc<Transaction>>),
|
SubscribeResults(mpsc::Receiver<Arc<Hash>>),
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,11 +1,12 @@
|
||||||
//! [`tower::Service`] for zebra-scan.
|
//! [`tower::Service`] for zebra-scan.
|
||||||
|
|
||||||
use std::{future::Future, pin::Pin, task::Poll, time::Duration};
|
use std::{collections::BTreeMap, future::Future, pin::Pin, task::Poll, time::Duration};
|
||||||
|
|
||||||
use futures::future::FutureExt;
|
use futures::future::FutureExt;
|
||||||
use tower::Service;
|
use tower::Service;
|
||||||
|
|
||||||
use zebra_chain::parameters::Network;
|
use zebra_chain::{parameters::Network, transaction::Hash};
|
||||||
|
|
||||||
use zebra_state::ChainTipChange;
|
use zebra_state::ChainTipChange;
|
||||||
|
|
||||||
use crate::{init::ScanTask, scan, storage::Storage, Config, Request, Response};
|
use crate::{init::ScanTask, scan, storage::Storage, Config, Request, Response};
|
||||||
|
@ -124,8 +125,31 @@ impl Service<Request> for ScanService {
|
||||||
.boxed();
|
.boxed();
|
||||||
}
|
}
|
||||||
|
|
||||||
Request::Results(_key_hashes) => {
|
Request::Results(keys) => {
|
||||||
// TODO: read results from db
|
let db = self.db.clone();
|
||||||
|
|
||||||
|
return async move {
|
||||||
|
let mut final_result = BTreeMap::new();
|
||||||
|
for key in keys {
|
||||||
|
let db = db.clone();
|
||||||
|
let mut heights_and_transactions = BTreeMap::new();
|
||||||
|
let txs = {
|
||||||
|
let key = key.clone();
|
||||||
|
tokio::task::spawn_blocking(move || db.sapling_results_for_key(&key))
|
||||||
|
}
|
||||||
|
.await?;
|
||||||
|
txs.iter().for_each(|(k, v)| {
|
||||||
|
heights_and_transactions
|
||||||
|
.entry(*k)
|
||||||
|
.or_insert_with(Vec::new)
|
||||||
|
.extend(v.iter().map(|x| Hash::from(*x)));
|
||||||
|
});
|
||||||
|
final_result.entry(key).or_insert(heights_and_transactions);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Response::Results(final_result))
|
||||||
|
}
|
||||||
|
.boxed();
|
||||||
}
|
}
|
||||||
|
|
||||||
Request::SubscribeResults(_key_hashes) => {
|
Request::SubscribeResults(_key_hashes) => {
|
||||||
|
@ -148,6 +172,6 @@ impl Service<Request> for ScanService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async move { Ok(Response::Results(vec![])) }.boxed()
|
async move { Ok(Response::Results(BTreeMap::new())) }.boxed()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -132,3 +132,79 @@ pub async fn scan_service_clears_results_correctly() -> Result<()> {
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Tests that results for key are returned correctly
|
||||||
|
#[tokio::test]
|
||||||
|
pub async fn scan_service_get_results_for_key_correctly() -> Result<()> {
|
||||||
|
let mut db = new_test_storage(Network::Mainnet);
|
||||||
|
|
||||||
|
let zec_pages_sapling_efvk = ZECPAGES_SAPLING_VIEWING_KEY.to_string();
|
||||||
|
|
||||||
|
for fake_result_height in [Height::MIN, Height(1), Height::MAX] {
|
||||||
|
db.insert_sapling_results(
|
||||||
|
&zec_pages_sapling_efvk,
|
||||||
|
fake_result_height,
|
||||||
|
fake_sapling_results([
|
||||||
|
TransactionIndex::MIN,
|
||||||
|
TransactionIndex::from_index(40),
|
||||||
|
TransactionIndex::MAX,
|
||||||
|
]),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
db.sapling_results(&zec_pages_sapling_efvk).len() == 3,
|
||||||
|
"there should be 3 heights for this key in the db"
|
||||||
|
);
|
||||||
|
|
||||||
|
for (_height, transactions) in db.sapling_results(&zec_pages_sapling_efvk) {
|
||||||
|
assert!(
|
||||||
|
transactions.len() == 3,
|
||||||
|
"there should be 3 transactions for each height for this key in the db"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// We don't need to send any command to the scanner for this call.
|
||||||
|
let (mut scan_service, _cmd_receiver) = ScanService::new_with_mock_scanner(db);
|
||||||
|
|
||||||
|
let response_fut = scan_service
|
||||||
|
.ready()
|
||||||
|
.await
|
||||||
|
.map_err(|err| eyre!(err))?
|
||||||
|
.call(Request::Results(vec![zec_pages_sapling_efvk.clone()]));
|
||||||
|
|
||||||
|
match response_fut.await.map_err(|err| eyre!(err))? {
|
||||||
|
Response::Results(results) => {
|
||||||
|
assert!(
|
||||||
|
results.contains_key(&zec_pages_sapling_efvk),
|
||||||
|
"results should contain the requested key"
|
||||||
|
);
|
||||||
|
assert!(results.len() == 1, "values are only for 1 key");
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
results
|
||||||
|
.get_key_value(&zec_pages_sapling_efvk)
|
||||||
|
.unwrap()
|
||||||
|
.1
|
||||||
|
.len()
|
||||||
|
== 3,
|
||||||
|
"we should have 3 heights for the given key "
|
||||||
|
);
|
||||||
|
|
||||||
|
for transactions in results
|
||||||
|
.get_key_value(&zec_pages_sapling_efvk)
|
||||||
|
.unwrap()
|
||||||
|
.1
|
||||||
|
.values()
|
||||||
|
{
|
||||||
|
assert!(
|
||||||
|
transactions.len() == 3,
|
||||||
|
"there should be 3 transactions for each height for this key"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => panic!("scan service returned unexpected response variant"),
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue