leader slots in Blobs (#1732)

* add leader slot to Blobs
* remove get_X() methods in favor of X() methods for Blob
* add slot to get_scheduled_leader()
This commit is contained in:
Rob Walker 2018-11-07 13:18:14 -08:00 committed by GitHub
parent 3ccbf81646
commit 6c10458b5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 129 additions and 118 deletions

View File

@ -1417,9 +1417,11 @@ impl Bank {
subscriptions.remove(pubkey).is_some()
}
pub fn get_current_leader(&self) -> Option<Pubkey> {
let ls_lock = self.leader_scheduler.read().unwrap();
ls_lock.get_scheduled_leader(self.tick_height())
pub fn get_current_leader(&self) -> Option<(Pubkey, u64)> {
self.leader_scheduler
.read()
.unwrap()
.get_scheduled_leader(self.tick_height())
}
pub fn tick_height(&self) -> u64 {

View File

@ -9,7 +9,7 @@ use influx_db_client as influxdb;
use ledger::Block;
use log::Level;
use metrics;
use packet::SharedBlobs;
use packet::{index_blobs, SharedBlobs};
use rayon::prelude::*;
use result::{Error, Result};
use service::Service;
@ -20,7 +20,7 @@ use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::{Duration, Instant};
use timing::duration_as_ms;
use window::{self, SharedWindow, WindowIndex, WindowUtil};
use window::{SharedWindow, WindowIndex, WindowUtil};
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum BroadcastStageReturnType {
@ -36,6 +36,7 @@ fn broadcast(
sock: &UdpSocket,
transmit_index: &mut WindowIndex,
receive_index: &mut u64,
leader_slot: u64,
) -> Result<()> {
let id = node_info.id;
let timer = Duration::new(1, 0);
@ -73,10 +74,7 @@ fn broadcast(
let blobs_len = blobs.len();
trace!("{}: broadcast blobs.len: {}", id, blobs_len);
// TODO: move all this into window.rs
// Index the blobs
window::index_blobs(node_info, &blobs, receive_index)
.expect("index blobs for initial window");
index_blobs(&blobs, &node_info.id, *receive_index, leader_slot);
// keep the cache of blobs that are broadcast
inc_new_counter_info!("streamer-broadcast-sent", blobs.len());
@ -84,13 +82,13 @@ fn broadcast(
let mut win = window.write().unwrap();
assert!(blobs.len() <= win.len());
for b in &blobs {
let ix = b.read().unwrap().get_index().expect("blob index");
let ix = b.read().unwrap().index().expect("blob index");
let pos = (ix % window_size) as usize;
if let Some(x) = win[pos].data.take() {
trace!(
"{} popped {} at {}",
id,
x.read().unwrap().get_index().unwrap(),
x.read().unwrap().index().unwrap(),
pos
);
}
@ -98,7 +96,7 @@ fn broadcast(
trace!(
"{} popped {} at {}",
id,
x.read().unwrap().get_index().unwrap(),
x.read().unwrap().index().unwrap(),
pos
);
}
@ -106,7 +104,7 @@ fn broadcast(
trace!("{} null {}", id, pos);
}
for b in &blobs {
let ix = b.read().unwrap().get_index().expect("blob index");
let ix = b.read().unwrap().index().expect("blob index");
let pos = (ix % window_size) as usize;
trace!("{} caching {} at {}", id, ix, pos);
assert!(win[pos].data.is_none());
@ -188,6 +186,7 @@ impl BroadcastStage {
cluster_info: &Arc<RwLock<ClusterInfo>>,
window: &SharedWindow,
entry_height: u64,
leader_slot: u64,
receiver: &Receiver<Vec<Entry>>,
) -> BroadcastStageReturnType {
let mut transmit_index = WindowIndex {
@ -206,6 +205,7 @@ impl BroadcastStage {
&sock,
&mut transmit_index,
&mut receive_index,
leader_slot,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => {
@ -242,6 +242,7 @@ impl BroadcastStage {
cluster_info: Arc<RwLock<ClusterInfo>>,
window: SharedWindow,
entry_height: u64,
leader_slot: u64,
receiver: Receiver<Vec<Entry>>,
exit_sender: Arc<AtomicBool>,
) -> Self {
@ -249,7 +250,14 @@ impl BroadcastStage {
.name("solana-broadcaster".to_string())
.spawn(move || {
let _exit = Finalizer::new(exit_sender);
Self::run(&sock, &cluster_info, &window, entry_height, &receiver)
Self::run(
&sock,
&cluster_info,
&window,
entry_height,
leader_slot,
&receiver,
)
}).unwrap();
BroadcastStage { thread_hdl }

View File

@ -539,7 +539,7 @@ impl ClusterInfo {
trace!(
"{}: BROADCAST idx: {} sz: {} to {},{} coding: {}",
me.id,
blob.get_index().unwrap(),
blob.index().unwrap(),
blob.meta.size,
v.id,
v.contact_info.tvu,
@ -587,7 +587,7 @@ impl ClusterInfo {
};
blob.write()
.unwrap()
.set_id(me.id)
.set_id(&me.id)
.expect("set_id in pub fn retransmit");
let rblob = blob.read().unwrap();
let orders: Vec<_> = table
@ -617,7 +617,7 @@ impl ClusterInfo {
debug!(
"{}: retransmit blob {} to {} {}",
me.id,
rblob.get_index().unwrap(),
rblob.index().unwrap(),
v.id,
v.contact_info.tvu,
);
@ -869,7 +869,7 @@ impl ClusterInfo {
let pos = (ix as usize) % window.read().unwrap().len();
if let Some(ref mut blob) = &mut window.write().unwrap()[pos].data {
let mut wblob = blob.write().unwrap();
let blob_ix = wblob.get_index().expect("run_window_request get_index");
let blob_ix = wblob.index().expect("run_window_request index");
if blob_ix == ix {
let num_retransmits = wblob.meta.num_retransmits;
wblob.meta.num_retransmits += 1;
@ -896,7 +896,7 @@ impl ClusterInfo {
outblob.meta.size = sz;
outblob.data[..sz].copy_from_slice(&wblob.data[..sz]);
outblob.meta.set_addr(from_addr);
outblob.set_id(sender_id).expect("blob set_id");
outblob.set_id(&sender_id).expect("blob set_id");
}
inc_new_counter_info!("cluster_info-window-request-pass", 1);
@ -1735,7 +1735,7 @@ mod tests {
} else {
mock_peer.id
};
assert_eq!(blob.get_id().unwrap(), id);
assert_eq!(blob.id().unwrap(), id);
}
}

View File

@ -108,7 +108,7 @@ impl Entry {
blob_w.set_index(idx).expect("set_index()");
}
if let Some(id) = id {
blob_w.set_id(id).expect("set_id()");
blob_w.set_id(&id).expect("set_id()");
}
if let Some(addr) = addr {
blob_w.meta.set_addr(addr);

View File

@ -295,8 +295,8 @@ pub fn generate_coding(
if let Some(data) = &window[n].data {
let data_rl = data.read().unwrap();
let index = data_rl.get_index().unwrap();
let id = data_rl.get_id().unwrap();
let index = data_rl.index().unwrap();
let id = data_rl.id().unwrap();
trace!(
"{} copying index {} id {:?} from data to coding",
@ -305,7 +305,7 @@ pub fn generate_coding(
id
);
coding_wl.set_index(index).unwrap();
coding_wl.set_id(id).unwrap();
coding_wl.set_id(&id).unwrap();
}
coding_wl.set_size(max_data_size);
if coding_wl.set_coding().is_err() {
@ -351,7 +351,7 @@ pub fn generate_coding(
// false if slot has a blob with the right index
fn is_missing(id: &Pubkey, idx: u64, window_slot: &mut Option<SharedBlob>, c_or_d: &str) -> bool {
if let Some(blob) = window_slot.take() {
let blob_idx = blob.read().unwrap().get_index().unwrap();
let blob_idx = blob.read().unwrap().index().unwrap();
if blob_idx == idx {
trace!("recover {}: idx: {} good {}", id, idx, c_or_d);
// put it back
@ -553,7 +553,7 @@ pub fn recover(id: &Pubkey, window: &mut [WindowSlot], start_idx: u64, start: us
let mut data_size;
if n < NUM_DATA {
data_size = locks[n].get_data_size().unwrap() as usize;
data_size = locks[n].data_size().unwrap() as usize;
data_size -= BLOB_HEADER_SIZE;
if data_size > BLOB_DATA_SIZE {
error!("{} corrupt data blob[{}] data_size: {}", id, idx, data_size);
@ -591,15 +591,14 @@ pub fn recover(id: &Pubkey, window: &mut [WindowSlot], start_idx: u64, start: us
#[cfg(test)]
mod test {
use cluster_info;
use erasure;
use logger;
use packet::{SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE, BLOB_SIZE};
use packet::{index_blobs, SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE, BLOB_SIZE};
use rand::{thread_rng, Rng};
use signature::{Keypair, KeypairUtil};
use solana_sdk::pubkey::Pubkey;
// use std::sync::{Arc, RwLock};
use window::{index_blobs, WindowSlot};
use window::WindowSlot;
#[test]
pub fn test_coding() {
@ -660,7 +659,7 @@ mod test {
let window_l2 = window_l1.read().unwrap();
print!(
"data index: {:?} meta.size: {} data: ",
window_l2.get_index(),
window_l2.index(),
window_l2.meta.size
);
for i in 0..64 {
@ -676,7 +675,7 @@ mod test {
let window_l2 = window_l1.read().unwrap();
print!(
"coding index: {:?} meta.size: {} data: ",
window_l2.get_index(),
window_l2.index(),
window_l2.meta.size
);
for i in 0..8 {
@ -726,10 +725,9 @@ mod test {
blobs.push(b_);
}
let d = cluster_info::NodeInfo::new_localhost(Keypair::new().pubkey());
assert!(index_blobs(&d, &blobs, &mut (offset as u64)).is_ok());
index_blobs(&blobs, &Keypair::new().pubkey(), offset as u64, 0);
for b in blobs {
let idx = b.read().unwrap().get_index().unwrap() as usize % WINDOW_SIZE;
let idx = b.read().unwrap().index().unwrap() as usize % WINDOW_SIZE;
window[idx].data = Some(b);
}
@ -815,7 +813,7 @@ mod test {
assert_eq!(window_l2.meta.port, ref_l2.meta.port);
assert_eq!(window_l2.meta.v6, ref_l2.meta.v6);
assert_eq!(
window_l2.get_index().unwrap(),
window_l2.index().unwrap(),
(erase_offset + WINDOW_SIZE) as u64
);
}
@ -850,7 +848,7 @@ mod test {
assert_eq!(window_l2.meta.port, ref_l2.meta.port);
assert_eq!(window_l2.meta.v6, ref_l2.meta.v6);
assert_eq!(
window_l2.get_index().unwrap(),
window_l2.index().unwrap(),
(erase_offset + WINDOW_SIZE) as u64
);
}
@ -896,7 +894,7 @@ mod test {
assert_eq!(window_l2.meta.port, ref_l2.meta.port);
assert_eq!(window_l2.meta.v6, ref_l2.meta.v6);
assert_eq!(
window_l2.get_index().unwrap(),
window_l2.index().unwrap(),
(erase_offset + WINDOW_SIZE) as u64
);
}

View File

@ -237,7 +237,7 @@ impl Fullnode {
}
// Get the scheduled leader
let scheduled_leader = bank
let (scheduled_leader, leader_slot) = bank
.get_current_leader()
.expect("Leader not known after processing bank");
@ -296,6 +296,7 @@ impl Fullnode {
cluster_info.clone(),
shared_window.clone(),
entry_height,
leader_slot,
entry_receiver,
tpu_exit,
);
@ -354,7 +355,7 @@ impl Fullnode {
);
let new_bank = Arc::new(new_bank);
let scheduled_leader = new_bank
let (scheduled_leader, _) = new_bank
.get_current_leader()
.expect("Scheduled leader should exist after rebuilding bank");
@ -446,6 +447,7 @@ impl Fullnode {
self.cluster_info.clone(),
self.shared_window.clone(),
entry_height,
0, // TODO: get real leader slot from leader_scheduler
blob_receiver,
tpu_exit,
);

View File

@ -248,15 +248,15 @@ impl LeaderScheduler {
// Uses the schedule generated by the last call to generate_schedule() to return the
// leader for a given PoH height in round-robin fashion
pub fn get_scheduled_leader(&self, height: u64) -> Option<Pubkey> {
pub fn get_scheduled_leader(&self, height: u64) -> Option<(Pubkey, u64)> {
if self.use_only_bootstrap_leader {
return Some(self.bootstrap_leader);
return Some((self.bootstrap_leader, 0));
}
// This covers cases where the schedule isn't yet generated.
if self.last_seed_height == None {
if height < self.bootstrap_height {
return Some(self.bootstrap_leader);
return Some((self.bootstrap_leader, 0));
} else {
// If there's been no schedule generated yet before we reach the end of the
// bootstrapping period, then the leader is unknown
@ -273,9 +273,10 @@ impl LeaderScheduler {
}
// Find index into the leader_schedule that this PoH height maps to
let leader_slot = (height - self.bootstrap_height) / self.leader_rotation_interval + 1;
let index = (height - last_seed_height) / self.leader_rotation_interval;
let validator_index = index as usize % self.leader_schedule.len();
Some(self.leader_schedule[validator_index])
Some((self.leader_schedule[validator_index], leader_slot))
}
// TODO: We use a HashSet for now because a single validator could potentially register
@ -351,7 +352,7 @@ impl LeaderScheduler {
// If possible, try to avoid having the same leader twice in a row, but
// if there's only one leader to choose from, then we have no other choice
if validator_rankings.len() > 1 {
let old_epoch_last_leader = self
let (old_epoch_last_leader, _) = self
.get_scheduled_leader(height - 1)
.expect("Previous leader schedule should still exist");
let new_epoch_start_leader = validator_rankings[0];
@ -587,11 +588,11 @@ mod tests {
// be the bootstrap leader
assert_eq!(
leader_scheduler.get_scheduled_leader(0),
Some(bootstrap_leader_id)
Some((bootstrap_leader_id, 0))
);
assert_eq!(
leader_scheduler.get_scheduled_leader(bootstrap_height - 1),
Some(bootstrap_leader_id)
Some((bootstrap_leader_id, 0))
);
assert_eq!(
leader_scheduler.get_scheduled_leader(bootstrap_height),
@ -625,7 +626,7 @@ mod tests {
let mut start_leader_index = None;
for i in 0..num_rounds {
let begin_height = bootstrap_height + i * leader_rotation_interval;
let current_leader = leader_scheduler
let (current_leader, slot) = leader_scheduler
.get_scheduled_leader(begin_height)
.expect("Expected a leader from scheduler");
@ -645,10 +646,11 @@ mod tests {
let expected_leader =
validators[(start_leader_index.unwrap() + i as usize) % num_validators];
assert_eq!(current_leader, expected_leader);
assert_eq!(slot, i + 1);
// Check that the same leader is in power for the next leader_rotation_interval entries
assert_eq!(
leader_scheduler.get_scheduled_leader(begin_height + leader_rotation_interval - 1),
Some(current_leader)
Some((current_leader, slot))
);
}
}

View File

@ -508,9 +508,9 @@ pub fn reconstruct_entries_from_blobs(blobs: Vec<SharedBlob>) -> Result<Vec<Entr
for blob in blobs {
let entry = {
let msg = blob.read().unwrap();
let msg_size = msg.get_size()?;
deserialize(&msg.data()[..msg_size])
let blob = blob.read().unwrap();
let blob_size = blob.size()?;
deserialize(&blob.data()[..blob_size])
};
match entry {

View File

@ -250,7 +250,8 @@ pub fn to_blobs<T: Serialize>(rsps: Vec<(T, SocketAddr)>) -> Result<SharedBlobs>
Ok(blobs)
}
const BLOB_INDEX_END: usize = size_of::<u64>();
const BLOB_SLOT_END: usize = size_of::<u64>();
const BLOB_INDEX_END: usize = BLOB_SLOT_END + size_of::<u64>();
const BLOB_ID_END: usize = BLOB_INDEX_END + size_of::<Pubkey>();
const BLOB_FLAGS_END: usize = BLOB_ID_END + size_of::<u32>();
const BLOB_SIZE_END: usize = BLOB_FLAGS_END + size_of::<u64>();
@ -265,31 +266,42 @@ pub const BLOB_FLAG_IS_CODING: u32 = 0x1;
pub const BLOB_HEADER_SIZE: usize = align!(BLOB_SIZE_END, 64);
impl Blob {
pub fn get_index(&self) -> Result<u64> {
let mut rdr = io::Cursor::new(&self.data[0..BLOB_INDEX_END]);
pub fn slot(&self) -> Result<u64> {
let mut rdr = io::Cursor::new(&self.data[0..BLOB_SLOT_END]);
let r = rdr.read_u64::<LittleEndian>()?;
Ok(r)
}
pub fn set_slot(&mut self, ix: u64) -> Result<()> {
let mut wtr = vec![];
wtr.write_u64::<LittleEndian>(ix)?;
self.data[..BLOB_SLOT_END].clone_from_slice(&wtr);
Ok(())
}
pub fn index(&self) -> Result<u64> {
let mut rdr = io::Cursor::new(&self.data[BLOB_SLOT_END..BLOB_INDEX_END]);
let r = rdr.read_u64::<LittleEndian>()?;
Ok(r)
}
pub fn set_index(&mut self, ix: u64) -> Result<()> {
let mut wtr = vec![];
wtr.write_u64::<LittleEndian>(ix)?;
self.data[..BLOB_INDEX_END].clone_from_slice(&wtr);
self.data[BLOB_SLOT_END..BLOB_INDEX_END].clone_from_slice(&wtr);
Ok(())
}
/// sender id, we use this for identifying if its a blob from the leader that we should
/// retransmit. eventually blobs should have a signature that we can use ffor spam filtering
pub fn get_id(&self) -> Result<Pubkey> {
pub fn id(&self) -> Result<Pubkey> {
let e = deserialize(&self.data[BLOB_INDEX_END..BLOB_ID_END])?;
Ok(e)
}
pub fn set_id(&mut self, id: Pubkey) -> Result<()> {
let wtr = serialize(&id)?;
pub fn set_id(&mut self, id: &Pubkey) -> Result<()> {
let wtr = serialize(id)?;
self.data[BLOB_INDEX_END..BLOB_ID_END].clone_from_slice(&wtr);
Ok(())
}
pub fn get_flags(&self) -> Result<u32> {
pub fn flags(&self) -> Result<u32> {
let mut rdr = io::Cursor::new(&self.data[BLOB_ID_END..BLOB_FLAGS_END]);
let r = rdr.read_u32::<LittleEndian>()?;
Ok(r)
@ -303,15 +315,15 @@ impl Blob {
}
pub fn is_coding(&self) -> bool {
(self.get_flags().unwrap() & BLOB_FLAG_IS_CODING) != 0
(self.flags().unwrap() & BLOB_FLAG_IS_CODING) != 0
}
pub fn set_coding(&mut self) -> Result<()> {
let flags = self.get_flags().unwrap();
let flags = self.flags().unwrap();
self.set_flags(flags | BLOB_FLAG_IS_CODING)
}
pub fn get_data_size(&self) -> Result<u64> {
pub fn data_size(&self) -> Result<u64> {
let mut rdr = io::Cursor::new(&self.data[BLOB_FLAGS_END..BLOB_SIZE_END]);
let r = rdr.read_u64::<LittleEndian>()?;
Ok(r)
@ -330,8 +342,8 @@ impl Blob {
pub fn data_mut(&mut self) -> &mut [u8] {
&mut self.data[BLOB_HEADER_SIZE..]
}
pub fn get_size(&self) -> Result<usize> {
let size = self.get_data_size()? as usize;
pub fn size(&self) -> Result<usize> {
let size = self.data_size()? as usize;
if self.meta.size == size {
Ok(size - BLOB_HEADER_SIZE)
} else {
@ -406,6 +418,20 @@ impl Blob {
}
}
pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, mut index: u64, slot: u64) {
// enumerate all the blobs, those are the indices
for b in blobs {
let mut blob = b.write().unwrap();
blob.set_index(index).expect("set_index");
blob.set_slot(slot).expect("set_slot");
blob.set_id(id).expect("set_id");
blob.set_flags(0).unwrap();
index += 1;
}
}
#[cfg(test)]
pub fn make_consecutive_blobs(
me_id: Pubkey,
@ -525,10 +551,10 @@ mod tests {
pub fn blob_test() {
let mut b = Blob::default();
b.set_index(<u64>::max_value()).unwrap();
assert_eq!(b.get_index().unwrap(), <u64>::max_value());
assert_eq!(b.index().unwrap(), <u64>::max_value());
b.data_mut()[0] = 1;
assert_eq!(b.data()[0], 1);
assert_eq!(b.get_index().unwrap(), <u64>::max_value());
assert_eq!(b.index().unwrap(), <u64>::max_value());
assert_eq!(b.meta, Meta::default());
}

View File

@ -78,13 +78,13 @@ impl ReplicateStage {
let mut res = Ok(());
let last_entry_id = {
let mut num_entries_to_write = entries.len();
let current_leader = bank
let (current_leader, _) = bank
.get_current_leader()
.expect("Scheduled leader id should never be unknown while processing entries");
for (i, entry) in entries.iter().enumerate() {
res = bank.process_entry(&entry);
let my_id = keypair.pubkey();
let scheduled_leader = bank
let (scheduled_leader, _) = bank
.get_current_leader()
.expect("Scheduled leader id should never be unknown while processing entries");
@ -164,7 +164,7 @@ impl ReplicateStage {
let mut entry_height_ = entry_height;
let mut last_entry_id = None;
loop {
let leader_id = bank
let (leader_id, _) = bank
.get_current_leader()
.expect("Scheduled leader id should never be unknown at this point");

View File

@ -279,7 +279,7 @@ pub mod tests {
let mut alice_ref_balance = starting_balance;
let mut msgs = Vec::new();
let mut cur_hash = Hash::default();
let mut blob_id = 0;
let mut blob_idx = 0;
let num_transfers = 10;
let transfer_amount = 501;
let bob_keypair = Keypair::new();
@ -306,9 +306,9 @@ pub mod tests {
let mut b = SharedBlob::default();
{
let mut w = b.write().unwrap();
w.set_index(blob_id).unwrap();
blob_id += 1;
w.set_id(leader_id).unwrap();
w.set_index(blob_idx).unwrap();
blob_idx += 1;
w.set_id(&leader_id).unwrap();
let serialized_entry = serialize(&entry).unwrap();

View File

@ -52,12 +52,9 @@ pub fn create_new_signed_vote_blob(
}
fn get_leader_tpu(bank: &Bank, cluster_info: &Arc<RwLock<ClusterInfo>>) -> Result<SocketAddr> {
let leader_id = {
if let Some(leader_id) = bank.get_current_leader() {
leader_id
} else {
return Err(Error::VoteError(VoteError::NoLeader));
}
let leader_id = match bank.get_current_leader() {
Some((leader_id, _)) => leader_id,
None => return Err(Error::VoteError(VoteError::NoLeader)),
};
let rcluster_info = cluster_info.read().unwrap();

View File

@ -1,6 +1,6 @@
//! The `window` module defines data structure for storing the tail of the ledger.
//!
use cluster_info::{ClusterInfo, NodeInfo};
use cluster_info::ClusterInfo;
use counter::Counter;
use entry::Entry;
#[cfg(feature = "erasure")]
@ -9,7 +9,6 @@ use leader_scheduler::LeaderScheduler;
use ledger::reconstruct_entries_from_blobs;
use log::Level;
use packet::SharedBlob;
use result::Result;
use solana_sdk::pubkey::Pubkey;
use std::cmp;
use std::mem;
@ -27,7 +26,7 @@ pub struct WindowSlot {
impl WindowSlot {
fn blob_index(&self) -> Option<u64> {
match self.data {
Some(ref blob) => blob.read().unwrap().get_index().ok(),
Some(ref blob) => blob.read().unwrap().index().ok(),
None => None,
}
}
@ -154,7 +153,7 @@ impl WindowUtil for Window {
ls_lock.max_height_for_leader(tick_height)
{
match ls_lock.get_scheduled_leader(next_leader_rotation_height) {
Some(leader_id) if leader_id == *id => is_next_leader = true,
Some((leader_id, _)) if leader_id == *id => is_next_leader = true,
// In the case that we are not in the current scope of the leader schedule
// window then either:
//
@ -296,7 +295,7 @@ impl WindowUtil for Window {
c_or_d: &str,
) -> bool {
if let Some(old) = mem::replace(window_slot, Some(blob)) {
let is_dup = old.read().unwrap().get_index().unwrap() == pix;
let is_dup = old.read().unwrap().index().unwrap() == pix;
trace!(
"{}: occupied {} window slot {:}, is_dup: {}",
id,
@ -341,7 +340,7 @@ impl WindowUtil for Window {
let k_data_blob;
let k_data_slot = &mut self[k].data;
if let Some(blob) = k_data_slot {
if blob.read().unwrap().get_index().unwrap() < *consumed {
if blob.read().unwrap().index().unwrap() < *consumed {
// window wrap-around, end of received
break;
}
@ -407,26 +406,6 @@ pub fn default_window() -> Window {
(0..2048).map(|_| WindowSlot::default()).collect()
}
pub fn index_blobs(
node_info: &NodeInfo,
blobs: &[SharedBlob],
receive_index: &mut u64,
) -> Result<()> {
// enumerate all the blobs, those are the indices
trace!("{}: INDEX_BLOBS {}", node_info.id, blobs.len());
for (i, b) in blobs.iter().enumerate() {
// only leader should be broadcasting
let mut blob = b.write().unwrap();
blob.set_id(node_info.id)
.expect("set_id in pub fn broadcast");
blob.set_index(*receive_index + i as u64)
.expect("set_index in pub fn broadcast");
blob.set_flags(0).unwrap();
}
Ok(())
}
#[cfg(test)]
mod test {
use packet::{Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE};

View File

@ -59,17 +59,14 @@ fn add_block_to_retransmit_queue(
//we need to maintain a sequence window
trace!(
"idx: {} addr: {:?} id: {:?} leader: {:?}",
p.get_index()
p.index()
.expect("get_index in fn add_block_to_retransmit_queue"),
p.get_id()
p.id()
.expect("get_id in trace! fn add_block_to_retransmit_queue"),
p.meta.addr(),
leader_id
);
if p.get_id()
.expect("get_id in fn add_block_to_retransmit_queue")
== leader_id
{
if p.id().expect("get_id in fn add_block_to_retransmit_queue") == leader_id {
//TODO
//need to copy the retransmitted blob
//otherwise we get into races with which thread
@ -202,7 +199,7 @@ fn recv_window(
for b in dq {
let (pix, meta_size) = {
let p = b.read().unwrap();
(p.get_index()?, p.meta.size)
(p.index()?, p.meta.size)
};
pixs.push(pix);
@ -495,8 +492,8 @@ mod test {
{
let mut w = b.write().unwrap();
w.set_index(i).unwrap();
w.set_id(me_id).unwrap();
assert_eq!(i, w.get_index().unwrap());
w.set_id(&me_id).unwrap();
assert_eq!(i, w.index().unwrap());
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&tn.info.contact_info.ncp);
}
@ -559,8 +556,8 @@ mod test {
{
let mut w = b.write().unwrap();
w.set_index(i).unwrap();
w.set_id(me_id).unwrap();
assert_eq!(i, w.get_index().unwrap());
w.set_id(&me_id).unwrap();
assert_eq!(i, w.index().unwrap());
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&tn.info.contact_info.ncp);
}
@ -579,8 +576,8 @@ mod test {
{
let mut w = b.write().unwrap();
w.set_index(i).unwrap();
w.set_id(me_id).unwrap();
assert_eq!(i, w.get_index().unwrap());
w.set_id(&me_id).unwrap();
assert_eq!(i, w.index().unwrap());
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&tn.info.contact_info.ncp);
}