Inline window method implementations

This commit is contained in:
Greg Fitzgerald 2018-09-07 13:48:35 -06:00
parent 2b44c4504a
commit 5025d89c88
1 changed files with 170 additions and 219 deletions

View File

@ -56,6 +56,7 @@ pub struct WindowIndex {
}
pub trait WindowUtil {
/// Finds available slots, clears them, and returns their indices.
fn clear_slots(&mut self, recycler: &BlobRecycler, consumed: u64, received: u64) -> Vec<u64>;
fn repair(
@ -85,7 +86,18 @@ pub trait WindowUtil {
impl WindowUtil for Window {
fn clear_slots(&mut self, recycler: &BlobRecycler, consumed: u64, received: u64) -> Vec<u64> {
clear_window_slots(self, recycler, consumed, received)
(consumed..received)
.filter_map(|pix| {
let i = (pix % WINDOW_SIZE) as usize;
if let Some(blob_idx) = self[i].blob_index() {
if blob_idx == pix {
return None;
}
}
self[i].clear_data(recycler);
Some(pix)
})
.collect()
}
fn repair(
@ -97,13 +109,87 @@ impl WindowUtil for Window {
consumed: u64,
received: u64,
) -> Vec<(SocketAddr, Vec<u8>)> {
repair_window(self, crdt, recycler, id, times, consumed, received)
let num_peers = crdt.read().unwrap().table.len() as u64;
let highest_lost = calculate_highest_lost_blob_index(num_peers, consumed, received);
let idxs = self.clear_slots(recycler, consumed, highest_lost);
let reqs: Vec<_> = idxs
.into_iter()
.filter_map(|pix| crdt.read().unwrap().window_index_request(pix).ok())
.collect();
inc_new_counter_info!("streamer-repair_window-repair", reqs.len());
if log_enabled!(Level::Trace) {
trace!(
"{}: repair_window counter times: {} consumed: {} highest_lost: {} missing: {}",
id,
times,
consumed,
highest_lost,
reqs.len()
);
for (to, _) in &reqs {
trace!("{}: repair_window request to {}", id, to);
}
}
reqs
}
fn print(&self, id: &Pubkey, consumed: u64) -> String {
print_window(self, id, consumed)
let pointer: Vec<_> = self
.iter()
.enumerate()
.map(|(i, _v)| {
if i == (consumed % WINDOW_SIZE) as usize {
"V"
} else {
" "
}
})
.collect();
let buf: Vec<_> = self
.iter()
.map(|v| {
if v.data.is_none() && v.coding.is_none() {
"O"
} else if v.data.is_some() && v.coding.is_some() {
"D"
} else if v.data.is_some() {
// coding.is_none()
"d"
} else {
// data.is_none()
"c"
}
})
.collect();
format!(
"\n{}: WINDOW ({}): {}\n{}: WINDOW ({}): {}",
id,
consumed,
pointer.join(""),
id,
consumed,
buf.join("")
)
}
/// process a blob: Add blob to the window. If a continuous set of blobs
/// starting from consumed is thereby formed, add that continuous
/// range of blobs to a queue to be sent on to the next stage.
///
/// * `self` - the window we're operating on
/// * `id` - this node's id
/// * `blob` - the blob to be processed into the window and rebroadcast
/// * `pix` - the index of the blob, corresponds to
/// the entry height of this blob
/// * `consume_queue` - output, blobs to be rebroadcast are placed here
/// * `recycler` - where to return the blob once processed, also where
/// to return old blobs from the window
/// * `consumed` - input/output, the entry-height to which this
/// node has populated and rebroadcast entries
fn process_blob(
&mut self,
id: &Pubkey,
@ -115,39 +201,89 @@ impl WindowUtil for Window {
leader_unknown: bool,
pending_retransmits: &mut bool,
) {
process_blob(
self,
id,
blob,
pix,
consume_queue,
recycler,
consumed,
leader_unknown,
pending_retransmits,
);
}
}
let w = (pix % WINDOW_SIZE) as usize;
/// Finds available slots, clears them, and returns their indices.
fn clear_window_slots(
window: &mut Window,
recycler: &BlobRecycler,
consumed: u64,
received: u64,
) -> Vec<u64> {
(consumed..received)
.filter_map(|pix| {
let i = (pix % WINDOW_SIZE) as usize;
if let Some(blob_idx) = window[i].blob_index() {
if blob_idx == pix {
return None;
}
let is_coding = {
let blob_r = blob
.read()
.expect("blob read lock for flogs streamer::window");
blob_r.is_coding()
};
// insert a newly received blob into a window slot, clearing out and recycling any previous
// blob unless the incoming blob is a duplicate (based on idx)
// returns whether the incoming is a duplicate blob
fn insert_blob_is_dup(
id: &Pubkey,
blob: SharedBlob,
pix: u64,
window_slot: &mut Option<SharedBlob>,
recycler: &BlobRecycler,
c_or_d: &str,
) -> bool {
if let Some(old) = mem::replace(window_slot, Some(blob)) {
let is_dup = old.read().unwrap().get_index().unwrap() == pix;
recycler.recycle(old, "insert_blob_is_dup");
trace!(
"{}: occupied {} window slot {:}, is_dup: {}",
id,
c_or_d,
pix,
is_dup
);
is_dup
} else {
trace!("{}: empty {} window slot {:}", id, c_or_d, pix);
false
}
window[i].clear_data(recycler);
Some(pix)
})
.collect()
}
// insert the new blob into the window, overwrite and recycle old (or duplicate) entry
let is_duplicate = if is_coding {
insert_blob_is_dup(id, blob, pix, &mut self[w].coding, recycler, "coding")
} else {
insert_blob_is_dup(id, blob, pix, &mut self[w].data, recycler, "data")
};
if is_duplicate {
return;
}
self[w].leader_unknown = leader_unknown;
*pending_retransmits = true;
#[cfg(feature = "erasure")]
{
if erasure::recover(
id,
recycler,
window,
*consumed,
(*consumed % WINDOW_SIZE) as usize,
).is_err()
{
trace!("{}: erasure::recover failed", id);
}
}
// push all contiguous blobs into consumed queue, increment consumed
loop {
let k = (*consumed % WINDOW_SIZE) as usize;
trace!("{}: k: {} consumed: {}", id, k, *consumed,);
if let Some(blob) = &self[k].data {
if blob.read().unwrap().get_index().unwrap() < *consumed {
// window wrap-around, end of received
break;
}
} else {
// self[k].data is None, end of received
break;
}
consume_queue.push(self[k].data.clone().expect("clone in fn recv_window"));
*consumed += 1;
}
}
}
fn calculate_highest_lost_blob_index(num_peers: u64, consumed: u64, received: u64) -> u64 {
@ -186,42 +322,6 @@ fn repair_backoff(last: &mut u64, times: &mut usize, consumed: u64) -> bool {
thread_rng().gen_range(0, *times as u64) == 0
}
fn repair_window(
window: &mut Window,
crdt: &Arc<RwLock<Crdt>>,
recycler: &BlobRecycler,
id: &Pubkey,
times: usize,
consumed: u64,
received: u64,
) -> Vec<(SocketAddr, Vec<u8>)> {
let num_peers = crdt.read().unwrap().table.len() as u64;
let highest_lost = calculate_highest_lost_blob_index(num_peers, consumed, received);
let idxs = window.clear_slots(recycler, consumed, highest_lost);
let reqs: Vec<_> = idxs
.into_iter()
.filter_map(|pix| crdt.read().unwrap().window_index_request(pix).ok())
.collect();
inc_new_counter_info!("streamer-repair_window-repair", reqs.len());
if log_enabled!(Level::Trace) {
trace!(
"{}: repair_window counter times: {} consumed: {} highest_lost: {} missing: {}",
id,
times,
consumed,
highest_lost,
reqs.len()
);
for (to, _) in &reqs {
trace!("{}: repair_window request to {}", id, to);
}
}
reqs
}
fn add_block_to_retransmit_queue(
b: &SharedBlob,
leader_id: Pubkey,
@ -320,115 +420,6 @@ fn retransmit_all_leader_blocks(
Ok(())
}
/// process a blob: Add blob to the window. If a continuous set of blobs
/// starting from consumed is thereby formed, add that continuous
/// range of blobs to a queue to be sent on to the next stage.
///
/// * `id` - this node's id
/// * `blob` - the blob to be processed into the window and rebroadcast
/// * `pix` - the index of the blob, corresponds to
/// the entry height of this blob
/// * `consume_queue` - output, blobs to be rebroadcast are placed here
/// * `window` - the window we're operating on
/// * `recycler` - where to return the blob once processed, also where
/// to return old blobs from the window
/// * `consumed` - input/output, the entry-height to which this
/// node has populated and rebroadcast entries
fn process_blob(
window: &mut Window,
id: &Pubkey,
blob: SharedBlob,
pix: u64,
consume_queue: &mut SharedBlobs,
recycler: &BlobRecycler,
consumed: &mut u64,
leader_unknown: bool,
pending_retransmits: &mut bool,
) {
let w = (pix % WINDOW_SIZE) as usize;
let is_coding = {
let blob_r = blob
.read()
.expect("blob read lock for flogs streamer::window");
blob_r.is_coding()
};
// insert a newly received blob into a window slot, clearing out and recycling any previous
// blob unless the incoming blob is a duplicate (based on idx)
// returns whether the incoming is a duplicate blob
fn insert_blob_is_dup(
id: &Pubkey,
blob: SharedBlob,
pix: u64,
window_slot: &mut Option<SharedBlob>,
recycler: &BlobRecycler,
c_or_d: &str,
) -> bool {
if let Some(old) = mem::replace(window_slot, Some(blob)) {
let is_dup = old.read().unwrap().get_index().unwrap() == pix;
recycler.recycle(old, "insert_blob_is_dup");
trace!(
"{}: occupied {} window slot {:}, is_dup: {}",
id,
c_or_d,
pix,
is_dup
);
is_dup
} else {
trace!("{}: empty {} window slot {:}", id, c_or_d, pix);
false
}
}
// insert the new blob into the window, overwrite and recycle old (or duplicate) entry
let is_duplicate = if is_coding {
insert_blob_is_dup(id, blob, pix, &mut window[w].coding, recycler, "coding")
} else {
insert_blob_is_dup(id, blob, pix, &mut window[w].data, recycler, "data")
};
if is_duplicate {
return;
}
window[w].leader_unknown = leader_unknown;
*pending_retransmits = true;
#[cfg(feature = "erasure")]
{
if erasure::recover(
id,
recycler,
window,
*consumed,
(*consumed % WINDOW_SIZE) as usize,
).is_err()
{
trace!("{}: erasure::recover failed", id);
}
}
// push all contiguous blobs into consumed queue, increment consumed
loop {
let k = (*consumed % WINDOW_SIZE) as usize;
trace!("{}: k: {} consumed: {}", id, k, *consumed,);
if let Some(blob) = &window[k].data {
if blob.read().unwrap().get_index().unwrap() < *consumed {
// window wrap-around, end of received
break;
}
} else {
// window[k].data is None, end of received
break;
}
consume_queue.push(window[k].data.clone().expect("clone in fn recv_window"));
*consumed += 1;
}
}
fn blob_idx_in_window(id: &Pubkey, pix: u64, consumed: u64, received: &mut u64) -> bool {
// Prevent receive window from running over
// Got a blob which has already been consumed, skip it
@ -555,46 +546,6 @@ fn recv_window(
Ok(())
}
pub fn print_window(window: &Window, id: &Pubkey, consumed: u64) -> String {
let pointer: Vec<_> = window
.iter()
.enumerate()
.map(|(i, _v)| {
if i == (consumed % WINDOW_SIZE) as usize {
"V"
} else {
" "
}
})
.collect();
let buf: Vec<_> = window
.iter()
.map(|v| {
if v.data.is_none() && v.coding.is_none() {
"O"
} else if v.data.is_some() && v.coding.is_some() {
"D"
} else if v.data.is_some() {
// coding.is_none()
"d"
} else {
// data.is_none()
"c"
}
})
.collect();
format!(
"\n{}: WINDOW ({}): {}\n{}: WINDOW ({}): {}",
id,
consumed,
pointer.join(""),
id,
consumed,
buf.join("")
)
}
pub fn default_window() -> SharedWindow {
Arc::new(RwLock::new(vec![
WindowSlot::default();