Refactor validator windowing

- a unit test for windowing functions
- issue #857
This commit is contained in:
Pankaj Garg 2018-08-06 17:58:44 +00:00 committed by Grimes
parent db2392a691
commit ceb5a76609
1 changed files with 71 additions and 17 deletions

88
src/streamer.rs Executable file → Normal file
View File

@ -13,6 +13,7 @@ use std::cmp;
use std::collections::VecDeque;
use std::mem;
use std::net::{SocketAddr, UdpSocket};
use std::result;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, RwLock};
@ -442,6 +443,42 @@ fn process_blob(
}
}
#[derive(Debug, PartialEq, Eq)]
enum RecvWindowError {
WindowOverrun,
AlreadyReceived,
}
fn validate_blob_against_window(
debug_id: u64,
pix: u64,
consumed: u64,
received: u64,
) -> result::Result<u64, RecvWindowError> {
// Prevent receive window from running over
if pix >= consumed + WINDOW_SIZE {
debug!(
"{:x}: received: {} will overrun window: {} skipping..",
debug_id,
pix,
consumed + WINDOW_SIZE
);
return Err(RecvWindowError::WindowOverrun);
}
// Got a blob which has already been consumed, skip it
// probably from a repair window request
if pix < consumed {
debug!(
"{:x}: received: {} but older than consumed: {} skipping..",
debug_id, pix, consumed
);
return Err(RecvWindowError::AlreadyReceived);
}
Ok(cmp::max(pix, received))
}
fn recv_window(
debug_id: u64,
window: &SharedWindow,
@ -489,23 +526,14 @@ fn recv_window(
let p = b.write().expect("'b' write lock in fn recv_window");
(p.get_index()?, p.meta.size)
};
// Prevent receive window from running over
if pix >= *consumed + WINDOW_SIZE {
recycler.recycle(b);
continue;
}
if pix > *received {
*received = pix;
}
// Got a blob which has already been consumed, skip it
// probably from a repair window request
if pix < *consumed {
debug!(
"{:x}: received: {} but older than consumed: {} skipping..",
debug_id, pix, *consumed
);
recycler.recycle(b);
continue;
let result = validate_blob_against_window(debug_id, pix, *consumed, *received);
match result {
Ok(v) => *received = v,
Err(_e) => {
recycler.recycle(b);
continue;
}
}
trace!("{:x} window pix: {} size: {}", debug_id, pix, meta_size);
@ -930,6 +958,8 @@ mod test {
use std::sync::{Arc, RwLock};
use std::time::Duration;
use streamer::calculate_highest_lost_blob_index;
use streamer::validate_blob_against_window;
use streamer::RecvWindowError;
use streamer::{blob_receiver, receiver, responder, window};
use streamer::{default_window, BlobReceiver, PacketReceiver, WINDOW_SIZE};
@ -1107,4 +1137,28 @@ mod test {
WINDOW_SIZE + 9
);
}
#[test]
pub fn validate_blob_against_window_test() {
assert_eq!(
validate_blob_against_window(0, 90 + WINDOW_SIZE, 90, 100).unwrap_err(),
RecvWindowError::WindowOverrun
);
assert_eq!(
validate_blob_against_window(0, 91 + WINDOW_SIZE, 90, 100).unwrap_err(),
RecvWindowError::WindowOverrun
);
assert_eq!(
validate_blob_against_window(0, 89, 90, 100).unwrap_err(),
RecvWindowError::AlreadyReceived
);
assert_eq!(
validate_blob_against_window(0, 91, 90, 100).ok().unwrap(),
100
);
assert_eq!(
validate_blob_against_window(0, 101, 90, 100).ok().unwrap(),
101
);
}
}