From 926bb0c794e3ea96f4b8214bb4a7d95f2e67ee02 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Fri, 7 Apr 2023 11:17:36 -0700 Subject: [PATCH] MultiIteratorScanner::finalize returns (payload, already_processed) (#31054) * finalize() returns (payload, already_processed) * Additional testing around already_handled * Return type wrapper and comment update --- core/src/multi_iterator_scanner.rs | 55 ++++++++++++++++++--- core/src/unprocessed_transaction_storage.rs | 4 +- 2 files changed, 51 insertions(+), 8 deletions(-) diff --git a/core/src/multi_iterator_scanner.rs b/core/src/multi_iterator_scanner.rs index 21c71bb468..b7f96ef63c 100644 --- a/core/src/multi_iterator_scanner.rs +++ b/core/src/multi_iterator_scanner.rs @@ -81,6 +81,11 @@ where initialized: bool, } +pub struct PayloadAndAlreadyHandled { + pub payload: U, + pub already_handled: Vec, +} + impl<'a, T, U, F> MultiIteratorScanner<'a, T, U, F> where F: FnMut(&T, &mut U) -> ProcessingDecision, @@ -113,9 +118,13 @@ where self.get_current_items() } - /// Consume the iterator and return the payload. - pub fn finalize(self) -> U { - self.payload + /// Consume the iterator. Return the payload, and a vector of booleans + /// indicating which items have been handled. + pub fn finalize(self) -> PayloadAndAlreadyHandled { + PayloadAndAlreadyHandled { + payload: self.payload, + already_handled: self.already_handled, + } } /// Initialize the `current_positions` vector for the first batch. @@ -192,7 +201,7 @@ where #[cfg(test)] mod tests { - use {super::MultiIteratorScanner, crate::multi_iterator_scanner::ProcessingDecision}; + use super::*; struct TestScannerPayload { locks: Vec, @@ -282,8 +291,12 @@ mod tests { let expected_batches = vec![vec![&0, &1], vec![&0, &2], vec![&0, &3], vec![&1]]; assert_eq!(actual_batches, expected_batches); - let TestScannerPayload { locks } = scanner.finalize(); + let PayloadAndAlreadyHandled { + payload: TestScannerPayload { locks }, + already_handled, + } = scanner.finalize(); assert_eq!(locks, vec![false; 4]); + assert!(already_handled.into_iter().all(|x| x)); } #[test] @@ -325,8 +338,12 @@ mod tests { ]; assert_eq!(actual_batches, expected_batches); - let TestScannerPayload { locks } = scanner.finalize(); + let PayloadAndAlreadyHandled { + payload: TestScannerPayload { locks }, + already_handled, + } = scanner.finalize(); assert_eq!(locks, vec![false; 4]); + assert!(already_handled.into_iter().all(|x| x)); } #[test] @@ -350,4 +367,30 @@ mod tests { let expected_batches = vec![vec![&0, &1], vec![&2]]; assert_eq!(actual_batches, expected_batches); } + + #[test] + fn test_multi_iterator_scanner_iterate_not_handled() { + let slice = [0, 1, 2]; + + // 0 and 2 will always be marked as later, and never actually handled + let should_process = |item: &i32, _payload: &mut ()| match item { + 1 => ProcessingDecision::Now, + _ => ProcessingDecision::Later, + }; + + let mut scanner = MultiIteratorScanner::new(&slice, 2, (), should_process); + let mut actual_batches = vec![]; + while let Some((batch, _payload)) = scanner.iterate() { + actual_batches.push(batch.to_vec()); + } + + // Batch 1: [1] + let expected_batches = vec![vec![&1]]; + assert_eq!(actual_batches, expected_batches); + + let PayloadAndAlreadyHandled { + already_handled, .. + } = scanner.finalize(); + assert_eq!(already_handled, vec![false, true, false]); + } } diff --git a/core/src/unprocessed_transaction_storage.rs b/core/src/unprocessed_transaction_storage.rs index 0e4f3d7d93..ff369d59a9 100644 --- a/core/src/unprocessed_transaction_storage.rs +++ b/core/src/unprocessed_transaction_storage.rs @@ -489,7 +489,7 @@ impl VoteStorage { } } - scanner.finalize().reached_end_of_slot + scanner.finalize().payload.reached_end_of_slot } } @@ -901,7 +901,7 @@ impl ThreadLocalUnprocessedPackets { new_retryable_packets.extend(retryable_packets); } - let reached_end_of_slot = scanner.finalize().reached_end_of_slot; + let reached_end_of_slot = scanner.finalize().payload.reached_end_of_slot; self.unprocessed_packet_batches.packet_priority_queue = new_retryable_packets; self.verify_priority_queue(original_capacity);