MultiIteratorScanner::finalize returns (payload, already_processed) (#31054)
* finalize() returns (payload, already_processed) * Additional testing around already_handled * Return type wrapper and comment update
This commit is contained in:
parent
15011eaa5a
commit
926bb0c794
|
@ -81,6 +81,11 @@ where
|
||||||
initialized: bool,
|
initialized: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct PayloadAndAlreadyHandled<U> {
|
||||||
|
pub payload: U,
|
||||||
|
pub already_handled: Vec<bool>,
|
||||||
|
}
|
||||||
|
|
||||||
impl<'a, T, U, F> MultiIteratorScanner<'a, T, U, F>
|
impl<'a, T, U, F> MultiIteratorScanner<'a, T, U, F>
|
||||||
where
|
where
|
||||||
F: FnMut(&T, &mut U) -> ProcessingDecision,
|
F: FnMut(&T, &mut U) -> ProcessingDecision,
|
||||||
|
@ -113,9 +118,13 @@ where
|
||||||
self.get_current_items()
|
self.get_current_items()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Consume the iterator and return the payload.
|
/// Consume the iterator. Return the payload, and a vector of booleans
|
||||||
pub fn finalize(self) -> U {
|
/// indicating which items have been handled.
|
||||||
self.payload
|
pub fn finalize(self) -> PayloadAndAlreadyHandled<U> {
|
||||||
|
PayloadAndAlreadyHandled {
|
||||||
|
payload: self.payload,
|
||||||
|
already_handled: self.already_handled,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Initialize the `current_positions` vector for the first batch.
|
/// Initialize the `current_positions` vector for the first batch.
|
||||||
|
@ -192,7 +201,7 @@ where
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use {super::MultiIteratorScanner, crate::multi_iterator_scanner::ProcessingDecision};
|
use super::*;
|
||||||
|
|
||||||
struct TestScannerPayload {
|
struct TestScannerPayload {
|
||||||
locks: Vec<bool>,
|
locks: Vec<bool>,
|
||||||
|
@ -282,8 +291,12 @@ mod tests {
|
||||||
let expected_batches = vec![vec![&0, &1], vec![&0, &2], vec![&0, &3], vec![&1]];
|
let expected_batches = vec![vec![&0, &1], vec![&0, &2], vec![&0, &3], vec![&1]];
|
||||||
assert_eq!(actual_batches, expected_batches);
|
assert_eq!(actual_batches, expected_batches);
|
||||||
|
|
||||||
let TestScannerPayload { locks } = scanner.finalize();
|
let PayloadAndAlreadyHandled {
|
||||||
|
payload: TestScannerPayload { locks },
|
||||||
|
already_handled,
|
||||||
|
} = scanner.finalize();
|
||||||
assert_eq!(locks, vec![false; 4]);
|
assert_eq!(locks, vec![false; 4]);
|
||||||
|
assert!(already_handled.into_iter().all(|x| x));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -325,8 +338,12 @@ mod tests {
|
||||||
];
|
];
|
||||||
assert_eq!(actual_batches, expected_batches);
|
assert_eq!(actual_batches, expected_batches);
|
||||||
|
|
||||||
let TestScannerPayload { locks } = scanner.finalize();
|
let PayloadAndAlreadyHandled {
|
||||||
|
payload: TestScannerPayload { locks },
|
||||||
|
already_handled,
|
||||||
|
} = scanner.finalize();
|
||||||
assert_eq!(locks, vec![false; 4]);
|
assert_eq!(locks, vec![false; 4]);
|
||||||
|
assert!(already_handled.into_iter().all(|x| x));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -350,4 +367,30 @@ mod tests {
|
||||||
let expected_batches = vec![vec![&0, &1], vec![&2]];
|
let expected_batches = vec![vec![&0, &1], vec![&2]];
|
||||||
assert_eq!(actual_batches, expected_batches);
|
assert_eq!(actual_batches, expected_batches);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_multi_iterator_scanner_iterate_not_handled() {
|
||||||
|
let slice = [0, 1, 2];
|
||||||
|
|
||||||
|
// 0 and 2 will always be marked as later, and never actually handled
|
||||||
|
let should_process = |item: &i32, _payload: &mut ()| match item {
|
||||||
|
1 => ProcessingDecision::Now,
|
||||||
|
_ => ProcessingDecision::Later,
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut scanner = MultiIteratorScanner::new(&slice, 2, (), should_process);
|
||||||
|
let mut actual_batches = vec![];
|
||||||
|
while let Some((batch, _payload)) = scanner.iterate() {
|
||||||
|
actual_batches.push(batch.to_vec());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Batch 1: [1]
|
||||||
|
let expected_batches = vec![vec![&1]];
|
||||||
|
assert_eq!(actual_batches, expected_batches);
|
||||||
|
|
||||||
|
let PayloadAndAlreadyHandled {
|
||||||
|
already_handled, ..
|
||||||
|
} = scanner.finalize();
|
||||||
|
assert_eq!(already_handled, vec![false, true, false]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -489,7 +489,7 @@ impl VoteStorage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
scanner.finalize().reached_end_of_slot
|
scanner.finalize().payload.reached_end_of_slot
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -901,7 +901,7 @@ impl ThreadLocalUnprocessedPackets {
|
||||||
new_retryable_packets.extend(retryable_packets);
|
new_retryable_packets.extend(retryable_packets);
|
||||||
}
|
}
|
||||||
|
|
||||||
let reached_end_of_slot = scanner.finalize().reached_end_of_slot;
|
let reached_end_of_slot = scanner.finalize().payload.reached_end_of_slot;
|
||||||
|
|
||||||
self.unprocessed_packet_batches.packet_priority_queue = new_retryable_packets;
|
self.unprocessed_packet_batches.packet_priority_queue = new_retryable_packets;
|
||||||
self.verify_priority_queue(original_capacity);
|
self.verify_priority_queue(original_capacity);
|
||||||
|
|
Loading…
Reference in New Issue