add(scan): Implement `ClearResults` `ScanService` request (#8219)
* implements ClearResults request * moves common code to another function * Applies suggestions from code review
This commit is contained in:
parent
80827f5294
commit
2bf16a3740
|
@ -1,5 +1,10 @@
|
||||||
//! `zebra_scan::service::ScanService` request types.
|
//! `zebra_scan::service::ScanService` request types.
|
||||||
|
|
||||||
|
use crate::BoxError;
|
||||||
|
|
||||||
|
/// The maximum number of keys that may be included in a request to the scan service
|
||||||
|
const MAX_REQUEST_KEYS: usize = 1000;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
/// Request types for `zebra_scan::service::ScanService`
|
/// Request types for `zebra_scan::service::ScanService`
|
||||||
pub enum Request {
|
pub enum Request {
|
||||||
|
@ -21,6 +26,61 @@ pub enum Request {
|
||||||
/// TODO: Accept `KeyHash`es and return a channel receiver
|
/// TODO: Accept `KeyHash`es and return a channel receiver
|
||||||
SubscribeResults(Vec<()>),
|
SubscribeResults(Vec<()>),
|
||||||
|
|
||||||
/// TODO: Accept `KeyHash`es and return transaction ids
|
/// Clear the results for a set of viewing keys
|
||||||
ClearResults(Vec<()>),
|
ClearResults(Vec<String>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Request {
|
||||||
|
/// Check that the request data is valid for the request variant
|
||||||
|
pub fn check(&self) -> Result<(), BoxError> {
|
||||||
|
self.check_num_keys()?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Checks that requests which include keys have a valid number of keys.
|
||||||
|
fn check_num_keys(&self) -> Result<(), BoxError> {
|
||||||
|
match self {
|
||||||
|
Request::DeleteKeys(keys) | Request::ClearResults(keys)
|
||||||
|
if keys.is_empty() || keys.len() > MAX_REQUEST_KEYS =>
|
||||||
|
{
|
||||||
|
Err(format!("request must include between 1 and {MAX_REQUEST_KEYS} keys").into())
|
||||||
|
}
|
||||||
|
|
||||||
|
_ => Ok(()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_check_num_keys() {
|
||||||
|
let fake_keys: Vec<_> = std::iter::repeat(String::new())
|
||||||
|
.take(MAX_REQUEST_KEYS + 1)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let bad_requests = [
|
||||||
|
Request::DeleteKeys(vec![]),
|
||||||
|
Request::DeleteKeys(fake_keys.clone()),
|
||||||
|
Request::ClearResults(vec![]),
|
||||||
|
Request::ClearResults(fake_keys),
|
||||||
|
];
|
||||||
|
|
||||||
|
let valid_requests = [
|
||||||
|
Request::DeleteKeys(vec![String::new()]),
|
||||||
|
Request::ClearResults(vec![String::new()]),
|
||||||
|
];
|
||||||
|
|
||||||
|
for request in bad_requests {
|
||||||
|
let error = request.check().expect_err("check should return an error");
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
format!("request must include between 1 and {MAX_REQUEST_KEYS} keys"),
|
||||||
|
error.to_string(),
|
||||||
|
"check_num_keys should return an error because there are too many keys"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
for request in valid_requests {
|
||||||
|
request.check().expect("check should return Ok(())");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,9 @@ pub enum Response {
|
||||||
/// Response to DeleteKeys request
|
/// Response to DeleteKeys request
|
||||||
DeletedKeys,
|
DeletedKeys,
|
||||||
|
|
||||||
|
/// Response to ClearResults request
|
||||||
|
ClearedResults,
|
||||||
|
|
||||||
/// Response to SubscribeResults request
|
/// Response to SubscribeResults request
|
||||||
SubscribeResults(mpsc::Receiver<Arc<Transaction>>),
|
SubscribeResults(mpsc::Receiver<Arc<Transaction>>),
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,9 +26,6 @@ pub struct ScanService {
|
||||||
/// A timeout applied to `DeleteKeys` requests.
|
/// A timeout applied to `DeleteKeys` requests.
|
||||||
const DELETE_KEY_TIMEOUT: Duration = Duration::from_secs(15);
|
const DELETE_KEY_TIMEOUT: Duration = Duration::from_secs(15);
|
||||||
|
|
||||||
/// The maximum number of keys that may be included in a request to the scan service
|
|
||||||
const MAX_REQUEST_KEYS: usize = 1000;
|
|
||||||
|
|
||||||
impl ScanService {
|
impl ScanService {
|
||||||
/// Create a new [`ScanService`].
|
/// Create a new [`ScanService`].
|
||||||
pub fn new(
|
pub fn new(
|
||||||
|
@ -75,6 +72,10 @@ impl Service<Request> for ScanService {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn call(&mut self, req: Request) -> Self::Future {
|
fn call(&mut self, req: Request) -> Self::Future {
|
||||||
|
if let Err(error) = req.check() {
|
||||||
|
return async move { Err(error) }.boxed();
|
||||||
|
}
|
||||||
|
|
||||||
match req {
|
match req {
|
||||||
Request::Info => {
|
Request::Info => {
|
||||||
let db = self.db.clone();
|
let db = self.db.clone();
|
||||||
|
@ -102,13 +103,6 @@ impl Service<Request> for ScanService {
|
||||||
let mut scan_task = self.scan_task.clone();
|
let mut scan_task = self.scan_task.clone();
|
||||||
|
|
||||||
return async move {
|
return async move {
|
||||||
if keys.len() > MAX_REQUEST_KEYS {
|
|
||||||
return Err(format!(
|
|
||||||
"maximum number of keys per request is {MAX_REQUEST_KEYS}"
|
|
||||||
)
|
|
||||||
.into());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for a message to confirm that the scan task has removed the key up to `DELETE_KEY_TIMEOUT`
|
// Wait for a message to confirm that the scan task has removed the key up to `DELETE_KEY_TIMEOUT`
|
||||||
let remove_keys_result =
|
let remove_keys_result =
|
||||||
tokio::time::timeout(DELETE_KEY_TIMEOUT, scan_task.remove_keys(&keys)?)
|
tokio::time::timeout(DELETE_KEY_TIMEOUT, scan_task.remove_keys(&keys)?)
|
||||||
|
@ -118,7 +112,7 @@ impl Service<Request> for ScanService {
|
||||||
// Delete the key from the database after either confirmation that it's been removed from the scan task, or
|
// Delete the key from the database after either confirmation that it's been removed from the scan task, or
|
||||||
// waiting `DELETE_KEY_TIMEOUT`.
|
// waiting `DELETE_KEY_TIMEOUT`.
|
||||||
let delete_key_task = tokio::task::spawn_blocking(move || {
|
let delete_key_task = tokio::task::spawn_blocking(move || {
|
||||||
db.delete_sapling_results(keys);
|
db.delete_sapling_keys(keys);
|
||||||
});
|
});
|
||||||
|
|
||||||
// Return timeout errors or `RecvError`s, or wait for the key to be deleted from the database.
|
// Return timeout errors or `RecvError`s, or wait for the key to be deleted from the database.
|
||||||
|
@ -138,8 +132,19 @@ impl Service<Request> for ScanService {
|
||||||
// TODO: send key_hashes and mpsc::Sender to scanner task, return mpsc::Receiver to caller
|
// TODO: send key_hashes and mpsc::Sender to scanner task, return mpsc::Receiver to caller
|
||||||
}
|
}
|
||||||
|
|
||||||
Request::ClearResults(_key_hashes) => {
|
Request::ClearResults(keys) => {
|
||||||
// TODO: clear results for these keys from db
|
let mut db = self.db.clone();
|
||||||
|
|
||||||
|
return async move {
|
||||||
|
// Clear results from db for the provided `keys`
|
||||||
|
tokio::task::spawn_blocking(move || {
|
||||||
|
db.delete_sapling_results(keys);
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(Response::ClearedResults)
|
||||||
|
}
|
||||||
|
.boxed();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -77,3 +77,58 @@ pub async fn scan_service_deletes_keys_correctly() -> Result<()> {
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Tests that results are cleared are deleted correctly
|
||||||
|
#[tokio::test]
|
||||||
|
pub async fn scan_service_clears_results_correctly() -> Result<()> {
|
||||||
|
let mut db = new_test_storage(Network::Mainnet);
|
||||||
|
|
||||||
|
let zec_pages_sapling_efvk = ZECPAGES_SAPLING_VIEWING_KEY.to_string();
|
||||||
|
|
||||||
|
for fake_result_height in [Height::MIN, Height(1), Height::MAX] {
|
||||||
|
db.insert_sapling_results(
|
||||||
|
&zec_pages_sapling_efvk,
|
||||||
|
fake_result_height,
|
||||||
|
fake_sapling_results([
|
||||||
|
TransactionIndex::MIN,
|
||||||
|
TransactionIndex::from_index(40),
|
||||||
|
TransactionIndex::MAX,
|
||||||
|
]),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
!db.sapling_results(&zec_pages_sapling_efvk).is_empty(),
|
||||||
|
"there should be some results for this key in the db"
|
||||||
|
);
|
||||||
|
|
||||||
|
let (mut scan_service, _cmd_receiver) = ScanService::new_with_mock_scanner(db.clone());
|
||||||
|
|
||||||
|
let response = scan_service
|
||||||
|
.ready()
|
||||||
|
.await
|
||||||
|
.map_err(|err| eyre!(err))?
|
||||||
|
.call(Request::ClearResults(vec![zec_pages_sapling_efvk.clone()]))
|
||||||
|
.await
|
||||||
|
.map_err(|err| eyre!(err))?;
|
||||||
|
|
||||||
|
match response {
|
||||||
|
Response::ClearedResults => {}
|
||||||
|
_ => panic!("scan service returned unexpected response variant"),
|
||||||
|
};
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
db.sapling_results(&zec_pages_sapling_efvk).len(),
|
||||||
|
1,
|
||||||
|
"all results for this key should have been deleted, one empty entry should remain"
|
||||||
|
);
|
||||||
|
|
||||||
|
for (_, result) in db.sapling_results(&zec_pages_sapling_efvk) {
|
||||||
|
assert!(
|
||||||
|
result.is_empty(),
|
||||||
|
"there should be no results for this entry in the db"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
|
@ -234,18 +234,24 @@ impl Storage {
|
||||||
.expect("unexpected database write failure");
|
.expect("unexpected database write failure");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Delete the sapling keys and their results, if they exist,
|
||||||
|
pub(crate) fn delete_sapling_keys(&mut self, keys: Vec<SaplingScanningKey>) {
|
||||||
|
self.sapling_tx_ids_cf()
|
||||||
|
.new_batch_for_writing()
|
||||||
|
.delete_sapling_keys(keys)
|
||||||
|
.write_batch()
|
||||||
|
.expect("unexpected database write failure");
|
||||||
|
}
|
||||||
|
|
||||||
/// Delete the results of sapling scanning `keys`, if they exist
|
/// Delete the results of sapling scanning `keys`, if they exist
|
||||||
pub(crate) fn delete_sapling_results(&mut self, keys: Vec<SaplingScanningKey>) {
|
pub(crate) fn delete_sapling_results(&mut self, keys: Vec<SaplingScanningKey>) {
|
||||||
let mut batch = self.sapling_tx_ids_cf().new_batch_for_writing();
|
let mut batch = self
|
||||||
|
.sapling_tx_ids_cf()
|
||||||
|
.new_batch_for_writing()
|
||||||
|
.delete_sapling_keys(keys.clone());
|
||||||
|
|
||||||
for key in &keys {
|
for key in &keys {
|
||||||
let from = SaplingScannedDatabaseIndex::min_for_key(key);
|
batch = batch.insert_sapling_height(key, Height::MIN);
|
||||||
let until_strictly_before = SaplingScannedDatabaseIndex::max_for_key(key);
|
|
||||||
|
|
||||||
batch = batch
|
|
||||||
.zs_delete_range(&from, &until_strictly_before)
|
|
||||||
// TODO: convert zs_delete_range() to take std::ops::RangeBounds
|
|
||||||
.zs_delete(&until_strictly_before);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
batch
|
batch
|
||||||
|
@ -271,3 +277,25 @@ impl<'cf> InsertSaplingHeight for WriteSaplingTxIdsBatch<'cf> {
|
||||||
self.zs_insert(&index, &None)
|
self.zs_insert(&index, &None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Utility trait for deleting sapling keys in a WriteSaplingTxIdsBatch.
|
||||||
|
trait DeleteSaplingKeys {
|
||||||
|
fn delete_sapling_keys(self, sapling_key: Vec<SaplingScanningKey>) -> Self;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'cf> DeleteSaplingKeys for WriteSaplingTxIdsBatch<'cf> {
|
||||||
|
/// Delete sapling keys and their results.
|
||||||
|
fn delete_sapling_keys(mut self, sapling_keys: Vec<SaplingScanningKey>) -> Self {
|
||||||
|
for key in &sapling_keys {
|
||||||
|
let from_index = SaplingScannedDatabaseIndex::min_for_key(key);
|
||||||
|
let until_strictly_before_index = SaplingScannedDatabaseIndex::max_for_key(key);
|
||||||
|
|
||||||
|
self = self
|
||||||
|
.zs_delete_range(&from_index, &until_strictly_before_index)
|
||||||
|
// TODO: convert zs_delete_range() to take std::ops::RangeBounds
|
||||||
|
.zs_delete(&until_strictly_before_index);
|
||||||
|
}
|
||||||
|
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -57,7 +57,7 @@ pub fn deletes_keys_and_results_correctly() {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
db.delete_sapling_results(vec![efvk.clone()]);
|
db.delete_sapling_keys(vec![efvk.clone()]);
|
||||||
|
|
||||||
assert!(
|
assert!(
|
||||||
db.sapling_results(efvk).is_empty(),
|
db.sapling_results(efvk).is_empty(),
|
||||||
|
@ -65,3 +65,69 @@ pub fn deletes_keys_and_results_correctly() {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Tests that keys are deleted correctly
|
||||||
|
#[test]
|
||||||
|
pub fn clears_results_correctly() {
|
||||||
|
let mut db = new_test_storage(Network::Mainnet);
|
||||||
|
|
||||||
|
let zec_pages_sapling_efvk = ZECPAGES_SAPLING_VIEWING_KEY.to_string();
|
||||||
|
|
||||||
|
// Replace the last letter of the zec_pages efvk
|
||||||
|
let fake_efvk = format!(
|
||||||
|
"{}t",
|
||||||
|
&ZECPAGES_SAPLING_VIEWING_KEY[..ZECPAGES_SAPLING_VIEWING_KEY.len() - 1]
|
||||||
|
);
|
||||||
|
|
||||||
|
let efvks = [&zec_pages_sapling_efvk, &fake_efvk];
|
||||||
|
let fake_heights = [Height::MIN, Height(1), Height::MAX];
|
||||||
|
let fake_transaction_indexes = [
|
||||||
|
TransactionIndex::MIN,
|
||||||
|
TransactionIndex::from_index(40),
|
||||||
|
TransactionIndex::MAX,
|
||||||
|
];
|
||||||
|
|
||||||
|
for efvk in efvks {
|
||||||
|
for fake_result_height in fake_heights {
|
||||||
|
db.insert_sapling_results(
|
||||||
|
efvk,
|
||||||
|
fake_result_height,
|
||||||
|
fake_sapling_results(fake_transaction_indexes),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let expected_num_entries = fake_heights.len();
|
||||||
|
let expected_num_results_per_entry = fake_transaction_indexes.len();
|
||||||
|
|
||||||
|
for efvk in efvks {
|
||||||
|
assert_eq!(
|
||||||
|
db.sapling_results(efvk).len(),
|
||||||
|
expected_num_entries,
|
||||||
|
"there should be {expected_num_entries} entries for this key in the db"
|
||||||
|
);
|
||||||
|
|
||||||
|
for (_, result) in db.sapling_results(efvk) {
|
||||||
|
assert_eq!(
|
||||||
|
result.len(),
|
||||||
|
expected_num_results_per_entry,
|
||||||
|
"there should be {expected_num_results_per_entry} results for this entry in the db"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
db.delete_sapling_results(vec![efvk.clone()]);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
db.sapling_results(efvk).len(),
|
||||||
|
1,
|
||||||
|
"all results for this key should have been deleted, one empty entry should remain"
|
||||||
|
);
|
||||||
|
|
||||||
|
for (_, result) in db.sapling_results(efvk) {
|
||||||
|
assert!(
|
||||||
|
result.is_empty(),
|
||||||
|
"there should be no results for this entry in the db"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue