remove Result<> from Blob accessors, add parent (#2608)

* remove Result<> from Blob accessors, add parent
* update chacha's golden
* fixup benches
This commit is contained in:
Rob Walker 2019-01-30 20:18:28 -08:00 committed by GitHub
parent a746969995
commit 1b50fbbc90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 204 additions and 249 deletions

View File

@ -19,17 +19,17 @@ fn bench_write_blobs(bench: &mut Bencher, blobs: &mut Vec<Blob>, ledger_path: &s
bench.iter(move || {
for blob in blobs.iter_mut() {
let index = blob.index().unwrap();
let index = blob.index();
db_ledger
.put_data_blob_bytes(
blob.slot().unwrap(),
blob.slot(),
index,
&blob.data[..BLOB_HEADER_SIZE + blob.size().unwrap()],
&blob.data[..BLOB_HEADER_SIZE + blob.size()],
)
.unwrap();
blob.set_index(index + num_blobs as u64).unwrap();
blob.set_index(index + num_blobs as u64);
}
});
@ -50,8 +50,8 @@ fn setup_read_bench(
// Convert the entries to blobs, write the blobs to the ledger
let mut blobs = entries.to_blobs();
for (index, b) in blobs.iter_mut().enumerate() {
b.set_index(index as u64).unwrap();
b.set_slot(slot).unwrap();
b.set_index(index as u64);
b.set_slot(slot);
}
db_ledger
.write_blobs(&blobs)
@ -67,7 +67,7 @@ fn bench_write_small(bench: &mut Bencher) {
let entries = make_tiny_test_entries(num_entries);
let mut blobs = entries.to_blobs();
for (index, b) in blobs.iter_mut().enumerate() {
b.set_index(index as u64).unwrap();
b.set_index(index as u64);
}
bench_write_blobs(bench, &mut blobs, &ledger_path);
}
@ -81,7 +81,7 @@ fn bench_write_big(bench: &mut Bencher) {
let entries = make_large_test_entries(num_entries);
let mut blobs = entries.to_blobs();
for (index, b) in blobs.iter_mut().enumerate() {
b.set_index(index as u64).unwrap();
b.set_index(index as u64);
}
bench_write_blobs(bench, &mut blobs, &ledger_path);
@ -159,8 +159,8 @@ fn bench_insert_data_blob_small(bench: &mut Bencher) {
bench.iter(move || {
for blob in blobs.iter_mut() {
let index = blob.index().unwrap();
blob.set_index(index + num_entries as u64).unwrap();
let index = blob.index();
blob.set_index(index + num_entries as u64);
}
db_ledger.write_blobs(&blobs).unwrap();
});
@ -181,12 +181,9 @@ fn bench_insert_data_blob_big(bench: &mut Bencher) {
bench.iter(move || {
for blob in shared_blobs.iter_mut() {
let index = blob.read().unwrap().index().unwrap();
let index = blob.read().unwrap().index();
db_ledger.write_shared_blobs(vec![blob.clone()]).unwrap();
blob.write()
.unwrap()
.set_index(index + num_entries as u64)
.unwrap();
blob.write().unwrap().set_index(index + num_entries as u64);
}
});

View File

@ -163,12 +163,15 @@ mod tests {
let size = out_file.read_to_end(&mut buf).unwrap();
let mut hasher = Hasher::default();
hasher.hash(&buf[..size]);
assert_eq!(
hasher.result(),
Hash::new(&hex!(
"16b1159b112b11d7a2fb7b0471797ab079bce7e0e86b8a879474616abb61e5aa"
)),
use bs58;
// golden needs to be updated if blob stuff changes....
let golden = Hash::new(
&bs58::decode("BES6jpfVwayNKq9YZbYjbZbyX3GLzFzeQJ7fksm6LifE")
.into_vec()
.unwrap(),
);
assert_eq!(hasher.result(), golden,);
remove_file(out_path).unwrap();
}
}

View File

@ -537,10 +537,7 @@ impl ClusterInfo {
let s = obj.read().unwrap();
(s.my_data().clone(), peers)
};
blob.write()
.unwrap()
.set_id(&me.id)
.expect("set_id in pub fn retransmit");
blob.write().unwrap().set_id(&me.id);
let rblob = blob.read().unwrap();
trace!("retransmit orders {}", orders.len());
let errs: Vec<_> = orders
@ -549,7 +546,7 @@ impl ClusterInfo {
debug!(
"{}: retransmit blob {} to {} {}",
me.id,
rblob.index().unwrap(),
rblob.index(),
v.id,
v.tvu,
);
@ -594,7 +591,7 @@ impl ClusterInfo {
trace!(
"{}: BROADCAST idx: {} sz: {} to {:?} coding: {}",
id,
blob.index().unwrap(),
blob.index(),
blob.meta.size,
ids_and_tvus,
blob.is_coding()
@ -1369,8 +1366,8 @@ mod tests {
{
let mut w_blob = blob.write().unwrap();
w_blob.set_size(data_size);
w_blob.set_index(1).expect("set_index()");
w_blob.set_slot(2).expect("set_slot()");
w_blob.set_index(1);
w_blob.set_slot(2);
w_blob.meta.size = data_size + BLOB_HEADER_SIZE;
}
@ -1382,8 +1379,8 @@ mod tests {
ClusterInfo::run_window_request(&me, &socketaddr_any!(), Some(&db_ledger), &me, 1);
assert!(!rv.is_empty());
let v = rv[0].clone();
assert_eq!(v.read().unwrap().index().unwrap(), 1);
assert_eq!(v.read().unwrap().slot().unwrap(), 2);
assert_eq!(v.read().unwrap().index(), 1);
assert_eq!(v.read().unwrap().slot(), 2);
assert_eq!(v.read().unwrap().meta.size, BLOB_HEADER_SIZE + data_size);
}

View File

@ -383,8 +383,8 @@ impl DbLedger {
.enumerate()
.map(|(idx, entry)| {
let mut b = entry.borrow().to_blob();
b.set_index(idx as u64 + index).unwrap();
b.set_slot(slot).unwrap();
b.set_index(idx as u64 + index);
b.set_slot(slot);
b
})
.collect();
@ -403,12 +403,7 @@ impl DbLedger {
return Ok(vec![]);
}
new_blobs.sort_unstable_by(|b1, b2| {
b1.borrow()
.index()
.unwrap()
.cmp(&b2.borrow().index().unwrap())
});
new_blobs.sort_unstable_by(|b1, b2| b1.borrow().index().cmp(&b2.borrow().index()));
let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT);
@ -425,10 +420,10 @@ impl DbLedger {
// TODO: Handle if leader sends different blob for same index when the index > consumed
// The old window implementation would just replace that index.
let lowest_index = new_blobs[0].borrow().index()?;
let lowest_slot = new_blobs[0].borrow().slot()?;
let highest_index = new_blobs.last().unwrap().borrow().index()?;
let highest_slot = new_blobs.last().unwrap().borrow().slot()?;
let lowest_index = new_blobs[0].borrow().index();
let lowest_slot = new_blobs[0].borrow().slot();
let highest_index = new_blobs.last().unwrap().borrow().index();
let highest_slot = new_blobs.last().unwrap().borrow().slot();
if lowest_index < meta.consumed {
return Err(Error::DbLedgerError(DbLedgerError::BlobForIndexExists));
}
@ -458,7 +453,7 @@ impl DbLedger {
let mut found_blob = None;
while index_into_blob < new_blobs.len() {
let new_blob = new_blobs[index_into_blob].borrow();
let index = new_blob.index()?;
let index = new_blob.index();
// Skip over duplicate blobs with the same index and continue
// until we either find the index we're looking for, or detect
@ -477,9 +472,9 @@ impl DbLedger {
// If we found the blob in the new_blobs vector, process it, otherwise,
// look for the blob in the database.
if let Some(next_blob) = found_blob {
current_slot = next_blob.slot()?;
let serialized_entry_data = &next_blob.data
[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + next_blob.size()?];
current_slot = next_blob.slot();
let serialized_entry_data =
&next_blob.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + next_blob.size()];
// Verify entries can actually be reconstructed
deserialize(serialized_entry_data).expect(
"Blob made it past validation, so must be deserializable at this point",
@ -521,8 +516,8 @@ impl DbLedger {
for blob in new_blobs {
let blob = blob.borrow();
let key = DataCf::key(blob.slot()?, blob.index()?);
let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()?];
let key = DataCf::key(blob.slot(), blob.index());
let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()];
batch.put_cf(self.data_cf.handle(), &key, serialized_blob_datas)?;
}
@ -539,7 +534,7 @@ impl DbLedger {
let mut meta = {
if let Some(meta) = self.meta_cf.get(&meta_key)? {
let first = blobs[0].read().unwrap();
assert_eq!(meta.consumed, first.index()?);
assert_eq!(meta.consumed, first.index());
meta
} else {
SlotMeta::new()
@ -548,18 +543,18 @@ impl DbLedger {
{
let last = blobs.last().unwrap().read().unwrap();
meta.consumed = last.index()? + 1;
meta.consumed_slot = last.slot()?;
meta.received = cmp::max(meta.received, last.index()? + 1);
meta.received_slot = cmp::max(meta.received_slot, last.index()?);
meta.consumed = last.index() + 1;
meta.consumed_slot = last.slot();
meta.received = cmp::max(meta.received, last.index() + 1);
meta.received_slot = cmp::max(meta.received_slot, last.index());
}
let mut batch = WriteBatch::default();
batch.put_cf(self.meta_cf.handle(), &meta_key, &serialize(&meta)?)?;
for blob in blobs {
let blob = blob.read().unwrap();
let key = DataCf::key(blob.slot()?, blob.index()?);
let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()?];
let key = DataCf::key(blob.slot(), blob.index());
let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()];
batch.put_cf(self.data_cf.handle(), &key, serialized_blob_datas)?;
}
self.db.write(batch)?;
@ -661,8 +656,8 @@ impl DbLedger {
let bytes = self.get_data_blob_bytes(slot, index)?;
Ok(bytes.map(|bytes| {
let blob = Blob::new(&bytes);
assert!(blob.slot().unwrap() == slot);
assert!(blob.index().unwrap() == index);
assert!(blob.slot() == slot);
assert!(blob.index() == index);
blob
}))
}
@ -854,9 +849,9 @@ where
.enumerate()
.map(|(idx, entry)| {
let mut b = entry.borrow().to_blob();
b.set_index(idx as u64).unwrap();
b.set_id(&keypair.pubkey()).unwrap();
b.set_slot(DEFAULT_SLOT_HEIGHT).unwrap();
b.set_index(idx as u64);
b.set_id(&keypair.pubkey());
b.set_slot(DEFAULT_SLOT_HEIGHT);
b
})
.collect();
@ -1052,7 +1047,7 @@ mod tests {
let shared_blobs = entries.to_shared_blobs();
for (i, b) in shared_blobs.iter().enumerate() {
b.write().unwrap().set_index(i as u64).unwrap();
b.write().unwrap().set_index(i as u64);
}
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
@ -1095,7 +1090,7 @@ mod tests {
let entries = make_tiny_test_entries(num_blobs);
let shared_blobs = entries.to_shared_blobs();
for (i, b) in shared_blobs.iter().enumerate() {
b.write().unwrap().set_index(i as u64).unwrap();
b.write().unwrap().set_index(i as u64);
}
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
@ -1132,7 +1127,7 @@ mod tests {
let entries = make_tiny_test_entries(num_blobs);
let shared_blobs = entries.to_shared_blobs();
for (i, b) in shared_blobs.iter().enumerate() {
b.write().unwrap().set_index(i as u64).unwrap();
b.write().unwrap().set_index(i as u64);
}
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
@ -1182,8 +1177,8 @@ mod tests {
for (i, b) in shared_blobs.iter().enumerate() {
let mut w_b = b.write().unwrap();
w_b.set_index(1 << (i * 8)).unwrap();
w_b.set_slot(DEFAULT_SLOT_HEIGHT).unwrap();
w_b.set_index(1 << (i * 8));
w_b.set_slot(DEFAULT_SLOT_HEIGHT);
}
assert_eq!(
@ -1224,8 +1219,8 @@ mod tests {
let shared_blobs = original_entries.clone().to_shared_blobs();
for (i, b) in shared_blobs.iter().enumerate() {
let mut w_b = b.write().unwrap();
w_b.set_index(i as u64).unwrap();
w_b.set_slot(i as u64).unwrap();
w_b.set_index(i as u64);
w_b.set_slot(i as u64);
}
assert_eq!(
@ -1271,8 +1266,8 @@ mod tests {
for (i, b) in shared_blobs.iter().enumerate() {
let index = (i / 2) as u64;
let mut w_b = b.write().unwrap();
w_b.set_index(index).unwrap();
w_b.set_slot(index).unwrap();
w_b.set_index(index);
w_b.set_slot(index);
}
assert_eq!(
@ -1321,8 +1316,8 @@ mod tests {
let shared_blobs = original_entries.to_shared_blobs();
for (i, b) in shared_blobs.iter().enumerate() {
let mut w_b = b.write().unwrap();
w_b.set_index(i as u64).unwrap();
w_b.set_slot(i as u64).unwrap();
w_b.set_index(i as u64);
w_b.set_slot(i as u64);
}
db_ledger
@ -1338,8 +1333,8 @@ mod tests {
for (i, b) in shared_blobs.iter().enumerate() {
let mut w_b = b.write().unwrap();
w_b.set_index(num_entries + i as u64).unwrap();
w_b.set_slot(num_entries + i as u64).unwrap();
w_b.set_index(num_entries + i as u64);
w_b.set_slot(num_entries + i as u64);
}
db_ledger

View File

@ -125,10 +125,9 @@ pub fn retransmit_all_leader_blocks(
for b in dq {
// Check if the blob is from the scheduled leader for its slot. If so,
// add to the retransmit_queue
if let Ok(slot) = b.read().unwrap().slot() {
if let Some(leader_id) = leader_scheduler.read().unwrap().get_leader_for_slot(slot) {
add_blob_to_retransmit_queue(b, leader_id, &mut retransmit_queue);
}
let slot = b.read().unwrap().slot();
if let Some(leader_id) = leader_scheduler.read().unwrap().get_leader_for_slot(slot) {
add_blob_to_retransmit_queue(b, leader_id, &mut retransmit_queue);
}
}
@ -154,7 +153,7 @@ pub fn add_blob_to_retransmit_queue(
retransmit_queue: &mut Vec<SharedBlob>,
) {
let p = b.read().unwrap();
if p.id().expect("get_id in fn add_block_to_retransmit_queue") == leader_id {
if p.id() == leader_id {
let nv = SharedBlob::default();
{
let mut mnv = nv.write().unwrap();
@ -186,7 +185,7 @@ pub fn process_blob(
// Github issue: https://github.com/solana-labs/solana/issues/1899.
let (slot, pix) = {
let r_blob = blob.read().unwrap();
(r_blob.slot()?, r_blob.index()?)
(r_blob.slot(), r_blob.index())
};
let leader = leader_scheduler.read().unwrap().get_leader_for_slot(slot);
@ -199,11 +198,7 @@ pub fn process_blob(
// Insert the new blob into the window
let mut consumed_entries = if is_coding {
let blob = &blob.read().unwrap();
db_ledger.put_coding_blob_bytes(
slot,
pix,
&blob.data[..BLOB_HEADER_SIZE + blob.size().unwrap()],
)?;
db_ledger.put_coding_blob_bytes(slot, pix, &blob.data[..BLOB_HEADER_SIZE + blob.size()])?;
vec![]
} else {
db_ledger.insert_data_blobs(vec![(*blob.read().unwrap()).borrow()])?
@ -280,8 +275,8 @@ fn try_erasure(db_ledger: &Arc<DbLedger>, consume_queue: &mut Vec<Entry>) -> Res
let c = c.read().unwrap();
db_ledger.put_coding_blob_bytes(
meta.consumed_slot,
c.index().unwrap(),
&c.data[..BLOB_HEADER_SIZE + c.size().unwrap()],
c.index(),
&c.data[..BLOB_HEADER_SIZE + c.size()],
)?;
}
@ -402,7 +397,7 @@ mod test {
let (blob_sender, blob_receiver) = channel();
// Expect blob from leader to be retransmitted
blob.write().unwrap().set_id(&leader).unwrap();
blob.write().unwrap().set_id(&leader);
retransmit_all_leader_blocks(&vec![blob.clone()], &leader_scheduler, &blob_sender)
.expect("Expect successful retransmit");
let output_blob = blob_receiver
@ -412,11 +407,11 @@ mod test {
// Retransmitted blob should be missing the leader id
assert_ne!(*output_blob[0].read().unwrap(), *blob.read().unwrap());
// Set the leader in the retransmitted blob, should now match the original
output_blob[0].write().unwrap().set_id(&leader).unwrap();
output_blob[0].write().unwrap().set_id(&leader);
assert_eq!(*output_blob[0].read().unwrap(), *blob.read().unwrap());
// Expect blob from nonleader to not be retransmitted
blob.write().unwrap().set_id(&nonleader).unwrap();
blob.write().unwrap().set_id(&nonleader);
retransmit_all_leader_blocks(&vec![blob], &leader_scheduler, &blob_sender)
.expect("Expect successful retransmit");
assert!(blob_receiver.try_recv().is_err());
@ -441,8 +436,8 @@ mod test {
const ONE: u64 = 1;
const OTHER: u64 = 4;
blobs[0].set_index(ONE).unwrap();
blobs[1].set_index(OTHER).unwrap();
blobs[0].set_index(ONE);
blobs[1].set_index(OTHER);
// Insert one blob at index = first_index
db_ledger.write_blobs(&blobs).unwrap();
@ -479,8 +474,8 @@ mod test {
let num_entries = 10;
let mut blobs = make_tiny_test_entries(num_entries).to_blobs();
for (i, b) in blobs.iter_mut().enumerate() {
b.set_index(i as u64 * gap).unwrap();
b.set_slot(slot).unwrap();
b.set_index(i as u64 * gap);
b.set_slot(slot);
}
db_ledger.write_blobs(&blobs).unwrap();
@ -635,7 +630,7 @@ mod test {
.get_coding_blob_bytes(slot_height, erased_index as u64)
.unwrap()
.unwrap()[BLOB_HEADER_SIZE..],
&erased_coding_l.data()[..erased_coding_l.size().unwrap() as usize],
&erased_coding_l.data()[..erased_coding_l.size() as usize],
);
}

View File

@ -247,7 +247,7 @@ where
for blob in blobs.into_iter() {
let entry: Entry = {
let msg_size = blob.borrow().size()?;
let msg_size = blob.borrow().size();
deserialize(&blob.borrow().data()[..msg_size])?
};
@ -459,8 +459,8 @@ pub fn make_consecutive_blobs(
let mut index = start_height;
for blob in &blobs {
let mut blob = blob.write().unwrap();
blob.set_index(index).unwrap();
blob.set_id(id).unwrap();
blob.set_index(index);
blob.set_id(id);
blob.meta.set_addr(addr);
index += 1;
}

View File

@ -215,7 +215,7 @@ fn decode_blobs(
let mut data_size;
if n < NUM_DATA {
data_size = locks[n].data_size().unwrap() as usize;
data_size = locks[n].data_size() as usize;
data_size -= BLOB_HEADER_SIZE;
if data_size > BLOB_DATA_SIZE {
error!("corrupt data blob[{}] data_size: {}", idx, data_size);
@ -225,8 +225,8 @@ fn decode_blobs(
} else {
data_size = size;
idx -= NUM_CODING as u64;
locks[n].set_slot(slot).unwrap();
locks[n].set_index(idx).unwrap();
locks[n].set_slot(slot);
locks[n].set_index(idx);
if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE {
error!("corrupt coding blob[{}] data_size: {}", idx, data_size);
@ -327,18 +327,18 @@ impl CodingGenerator {
let mut coding_blobs = Vec::with_capacity(NUM_CODING);
for data_blob in &data_locks[NUM_DATA - NUM_CODING..NUM_DATA] {
let index = data_blob.index().unwrap();
let slot = data_blob.slot().unwrap();
let id = data_blob.id().unwrap();
let index = data_blob.index();
let slot = data_blob.slot();
let id = data_blob.id();
let coding_blob = SharedBlob::default();
{
let mut coding_blob = coding_blob.write().unwrap();
coding_blob.set_index(index).unwrap();
coding_blob.set_slot(slot).unwrap();
coding_blob.set_id(&id).unwrap();
coding_blob.set_index(index);
coding_blob.set_slot(slot);
coding_blob.set_id(&id);
coding_blob.set_size(max_data_size);
coding_blob.set_coding().unwrap();
coding_blob.set_coding();
}
coding_blobs.push(coding_blob);
}
@ -583,7 +583,7 @@ pub mod test {
assert_eq!(i % NUM_DATA, NUM_DATA - 1);
assert_eq!(coding.len(), NUM_CODING);
let size = coding[0].read().unwrap().size().unwrap();
let size = coding[0].read().unwrap().size();
// toss one data and one coding
let erasures: Vec<i32> = vec![0, NUM_DATA as i32, -1];
@ -649,9 +649,9 @@ pub mod test {
let data = data.read().unwrap();
db_ledger
.put_data_blob_bytes(
data.slot().unwrap(),
data.index().unwrap(),
&data.data[..data.data_size().unwrap() as usize],
data.slot(),
data.index(),
&data.data[..data.data_size() as usize],
)
.expect("Expected successful put into data column of ledger");
} else {
@ -664,17 +664,13 @@ pub mod test {
if let Some(ref coding) = slot.coding {
let coding_lock = coding.read().unwrap();
let index = coding_lock
.index()
.expect("Expected coding blob to have valid index");
let index = coding_lock.index();
let data_size = coding_lock
.size()
.expect("Expected coding blob to have valid data size");
let data_size = coding_lock.size();
db_ledger
.put_coding_blob_bytes(
coding_lock.slot().unwrap(),
coding_lock.slot(),
index,
&coding_lock.data[..data_size as usize + BLOB_HEADER_SIZE],
)
@ -764,9 +760,9 @@ pub mod test {
if let Some(data) = &window[n].data {
let data_rl = data.read().unwrap();
let index = data_rl.index().unwrap();
let slot = data_rl.slot().unwrap();
let id = data_rl.id().unwrap();
let index = data_rl.index();
let slot = data_rl.slot();
let id = data_rl.id();
trace!(
"{} copying index {} id {:?} from data to coding",
@ -774,14 +770,12 @@ pub mod test {
index,
id
);
coding_wl.set_index(index).unwrap();
coding_wl.set_slot(slot).unwrap();
coding_wl.set_id(&id).unwrap();
coding_wl.set_index(index);
coding_wl.set_slot(slot);
coding_wl.set_id(&id);
}
coding_wl.set_size(max_data_size);
if coding_wl.set_coding().is_err() {
return Err(Error::ErasureError(ErasureError::EncodeError));
}
coding_wl.set_coding();
coding_blobs.push(coding.clone());
}
@ -904,7 +898,7 @@ pub mod test {
);
for b in blobs {
let idx = b.read().unwrap().index().unwrap() as usize % WINDOW_SIZE;
let idx = b.read().unwrap().index() as usize % WINDOW_SIZE;
window[idx].data = Some(b);
}
@ -936,7 +930,7 @@ pub mod test {
let blobs = generate_test_blobs(offset, num_blobs);
for b in blobs.into_iter() {
let idx = b.read().unwrap().index().unwrap() as usize % WINDOW_SIZE;
let idx = b.read().unwrap().index() as usize % WINDOW_SIZE;
window[idx].data = Some(b);
}
@ -994,13 +988,13 @@ pub mod test {
let ref_l2 = ref_l.read().unwrap();
let result = recovered_blob.read().unwrap();
assert_eq!(result.size().unwrap(), ref_l2.size().unwrap());
assert_eq!(result.size(), ref_l2.size());
assert_eq!(
result.data[..ref_l2.data_size().unwrap() as usize],
ref_l2.data[..ref_l2.data_size().unwrap() as usize]
result.data[..ref_l2.data_size() as usize],
ref_l2.data[..ref_l2.data_size() as usize]
);
assert_eq!(result.index().unwrap(), offset as u64);
assert_eq!(result.slot().unwrap(), DEFAULT_SLOT_HEIGHT as u64);
assert_eq!(result.index(), offset as u64);
assert_eq!(result.slot(), DEFAULT_SLOT_HEIGHT as u64);
}
drop(db_ledger);
DbLedger::destroy(&ledger_path)
@ -1047,26 +1041,26 @@ pub mod test {
let ref_l2 = ref_l.read().unwrap();
let result = recovered_data_blob.read().unwrap();
assert_eq!(result.size().unwrap(), ref_l2.size().unwrap());
assert_eq!(result.size(), ref_l2.size());
assert_eq!(
result.data[..ref_l2.data_size().unwrap() as usize],
ref_l2.data[..ref_l2.data_size().unwrap() as usize]
result.data[..ref_l2.data_size() as usize],
ref_l2.data[..ref_l2.data_size() as usize]
);
assert_eq!(result.index().unwrap(), coding_start as u64);
assert_eq!(result.slot().unwrap(), DEFAULT_SLOT_HEIGHT as u64);
assert_eq!(result.index(), coding_start as u64);
assert_eq!(result.slot(), DEFAULT_SLOT_HEIGHT as u64);
// Check the recovered erasure result
let ref_l = refwindowcoding.clone().unwrap();
let ref_l2 = ref_l.read().unwrap();
let result = recovered_coding_blob.read().unwrap();
assert_eq!(result.size().unwrap(), ref_l2.size().unwrap());
assert_eq!(result.size(), ref_l2.size());
assert_eq!(
result.data()[..ref_l2.size().unwrap() as usize],
ref_l2.data()[..ref_l2.size().unwrap() as usize]
result.data()[..ref_l2.size() as usize],
ref_l2.data()[..ref_l2.size() as usize]
);
assert_eq!(result.index().unwrap(), coding_start as u64);
assert_eq!(result.slot().unwrap(), DEFAULT_SLOT_HEIGHT as u64);
assert_eq!(result.index(), coding_start as u64);
assert_eq!(result.slot(), DEFAULT_SLOT_HEIGHT as u64);
}
drop(db_ledger);
DbLedger::destroy(&ledger_path)

View File

@ -2,8 +2,8 @@
use crate::counter::Counter;
use crate::recvmmsg::{recv_mmsg, NUM_RCVMMSGS};
use crate::result::{Error, Result};
use bincode::{deserialize, serialize, serialize_into};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use bincode::{serialize, serialize_into};
use byteorder::{ByteOrder, LittleEndian};
use log::Level;
use serde::Serialize;
pub use solana_sdk::packet::PACKET_DATA_SIZE;
@ -272,20 +272,28 @@ pub fn to_shared_blobs<T: Serialize>(rsps: Vec<(T, SocketAddr)>) -> Result<Share
Ok(blobs)
}
const BLOB_SLOT_END: usize = size_of::<u64>();
const BLOB_INDEX_END: usize = BLOB_SLOT_END + size_of::<u64>();
const BLOB_ID_END: usize = BLOB_INDEX_END + size_of::<Pubkey>();
const BLOB_FLAGS_END: usize = BLOB_ID_END + size_of::<u32>();
const BLOB_SIZE_END: usize = BLOB_FLAGS_END + size_of::<u64>();
macro_rules! align {
($x:expr, $align:expr) => {
$x + ($align - 1) & !($align - 1)
macro_rules! range {
($prev:expr, $type:ident) => {
$prev..$prev + size_of::<$type>()
};
}
const PARENT_RANGE: std::ops::Range<usize> = range!(0, u64);
const SLOT_RANGE: std::ops::Range<usize> = range!(PARENT_RANGE.end, u64);
const INDEX_RANGE: std::ops::Range<usize> = range!(SLOT_RANGE.end, u64);
const ID_RANGE: std::ops::Range<usize> = range!(INDEX_RANGE.end, Pubkey);
const FLAGS_RANGE: std::ops::Range<usize> = range!(ID_RANGE.end, u32);
const SIZE_RANGE: std::ops::Range<usize> = range!(FLAGS_RANGE.end, u64);
macro_rules! align {
($x:expr, $align:expr) => {
$x + ($align * 8 - 1) & !($align * 8 - 1)
};
}
pub const BLOB_HEADER_SIZE: usize = align!(SIZE_RANGE.end, 8);
pub const BLOB_FLAG_IS_CODING: u32 = 0x1;
pub const BLOB_HEADER_SIZE: usize = align!(BLOB_SIZE_END, 64);
impl Blob {
pub fn new(data: &[u8]) -> Self {
@ -293,78 +301,61 @@ impl Blob {
let data_len = cmp::min(data.len(), blob.data.len());
let bytes = &data[..data_len];
blob.data[..data_len].copy_from_slice(bytes);
blob.meta.size = blob.data_size().expect("Expected valid data size") as usize;
blob.meta.size = blob.data_size() as usize;
blob
}
pub fn slot(&self) -> Result<u64> {
let mut rdr = io::Cursor::new(&self.data[0..BLOB_SLOT_END]);
let r = rdr.read_u64::<LittleEndian>()?;
Ok(r)
pub fn parent(&self) -> u64 {
LittleEndian::read_u64(&self.data[PARENT_RANGE])
}
pub fn set_slot(&mut self, ix: u64) -> Result<()> {
let mut wtr = vec![];
wtr.write_u64::<LittleEndian>(ix)?;
self.data[..BLOB_SLOT_END].clone_from_slice(&wtr);
Ok(())
pub fn set_parent(&mut self, ix: u64) {
LittleEndian::write_u64(&mut self.data[PARENT_RANGE], ix);
}
pub fn index(&self) -> Result<u64> {
let mut rdr = io::Cursor::new(&self.data[BLOB_SLOT_END..BLOB_INDEX_END]);
let r = rdr.read_u64::<LittleEndian>()?;
Ok(r)
pub fn slot(&self) -> u64 {
LittleEndian::read_u64(&self.data[SLOT_RANGE])
}
pub fn set_index(&mut self, ix: u64) -> Result<()> {
let mut wtr = vec![];
wtr.write_u64::<LittleEndian>(ix)?;
self.data[BLOB_SLOT_END..BLOB_INDEX_END].clone_from_slice(&wtr);
Ok(())
pub fn set_slot(&mut self, ix: u64) {
LittleEndian::write_u64(&mut self.data[SLOT_RANGE], ix);
}
pub fn index(&self) -> u64 {
LittleEndian::read_u64(&self.data[INDEX_RANGE])
}
pub fn set_index(&mut self, ix: u64) {
LittleEndian::write_u64(&mut self.data[INDEX_RANGE], ix);
}
/// sender id, we use this for identifying if its a blob from the leader that we should
/// retransmit. eventually blobs should have a signature that we can use for spam filtering
pub fn id(&self) -> Result<Pubkey> {
let e = deserialize(&self.data[BLOB_INDEX_END..BLOB_ID_END])?;
Ok(e)
pub fn id(&self) -> Pubkey {
Pubkey::new(&self.data[ID_RANGE])
}
pub fn set_id(&mut self, id: &Pubkey) -> Result<()> {
let wtr = serialize(id)?;
self.data[BLOB_INDEX_END..BLOB_ID_END].clone_from_slice(&wtr);
Ok(())
pub fn set_id(&mut self, id: &Pubkey) {
self.data[ID_RANGE].copy_from_slice(id.as_ref())
}
pub fn flags(&self) -> Result<u32> {
let mut rdr = io::Cursor::new(&self.data[BLOB_ID_END..BLOB_FLAGS_END]);
let r = rdr.read_u32::<LittleEndian>()?;
Ok(r)
pub fn flags(&self) -> u32 {
LittleEndian::read_u32(&self.data[FLAGS_RANGE])
}
pub fn set_flags(&mut self, ix: u32) -> Result<()> {
let mut wtr = vec![];
wtr.write_u32::<LittleEndian>(ix)?;
self.data[BLOB_ID_END..BLOB_FLAGS_END].clone_from_slice(&wtr);
Ok(())
pub fn set_flags(&mut self, ix: u32) {
LittleEndian::write_u32(&mut self.data[FLAGS_RANGE], ix);
}
pub fn is_coding(&self) -> bool {
(self.flags().unwrap() & BLOB_FLAG_IS_CODING) != 0
(self.flags() & BLOB_FLAG_IS_CODING) != 0
}
pub fn set_coding(&mut self) -> Result<()> {
let flags = self.flags().unwrap();
self.set_flags(flags | BLOB_FLAG_IS_CODING)
pub fn set_coding(&mut self) {
let flags = self.flags();
self.set_flags(flags | BLOB_FLAG_IS_CODING);
}
pub fn data_size(&self) -> Result<u64> {
let mut rdr = io::Cursor::new(&self.data[BLOB_FLAGS_END..BLOB_SIZE_END]);
let r = rdr.read_u64::<LittleEndian>()?;
Ok(r)
pub fn data_size(&self) -> u64 {
LittleEndian::read_u64(&self.data[SIZE_RANGE])
}
pub fn set_data_size(&mut self, ix: u64) -> Result<()> {
let mut wtr = vec![];
wtr.write_u64::<LittleEndian>(ix)?;
self.data[BLOB_FLAGS_END..BLOB_SIZE_END].clone_from_slice(&wtr);
Ok(())
pub fn set_data_size(&mut self, ix: u64) {
LittleEndian::write_u64(&mut self.data[SIZE_RANGE], ix);
}
pub fn data(&self) -> &[u8] {
@ -373,20 +364,20 @@ impl Blob {
pub fn data_mut(&mut self) -> &mut [u8] {
&mut self.data[BLOB_HEADER_SIZE..]
}
pub fn size(&self) -> Result<usize> {
let size = self.data_size()? as usize;
if size <= BLOB_HEADER_SIZE {
Err(Error::BlobError(BlobError::BadState))
} else if self.meta.size == size {
Ok(size - BLOB_HEADER_SIZE)
pub fn size(&self) -> usize {
let size = self.data_size() as usize;
if size > BLOB_HEADER_SIZE && size == self.meta.size {
size - BLOB_HEADER_SIZE
} else {
Err(Error::BlobError(BlobError::BadState))
0
}
}
pub fn set_size(&mut self, size: usize) {
let new_size = size + BLOB_HEADER_SIZE;
self.meta.size = new_size;
self.set_data_size(new_size as u64).unwrap();
self.set_data_size(new_size as u64);
}
pub fn recv_blob(socket: &UdpSocket, r: &SharedBlob) -> io::Result<()> {
@ -456,9 +447,9 @@ pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, mut index: u64, slots: &[u
for (blob, slot) in blobs.iter().zip(slots) {
let mut blob = blob.write().unwrap();
blob.set_index(index).expect("set_index");
blob.set_slot(*slot).expect("set_slot");
blob.set_id(id).expect("set_id");
blob.set_index(index);
blob.set_slot(*slot);
blob.set_id(id);
index += 1;
}
@ -571,11 +562,11 @@ mod tests {
#[test]
pub fn blob_test() {
let mut b = Blob::default();
b.set_index(<u64>::max_value()).unwrap();
assert_eq!(b.index().unwrap(), <u64>::max_value());
b.set_index(<u64>::max_value());
assert_eq!(b.index(), <u64>::max_value());
b.data_mut()[0] = 1;
assert_eq!(b.data()[0], 1);
assert_eq!(b.index().unwrap(), <u64>::max_value());
assert_eq!(b.index(), <u64>::max_value());
assert_eq!(b.meta, Meta::default());
}

View File

@ -338,9 +338,9 @@ pub mod tests {
let b = SharedBlob::default();
{
let mut w = b.write().unwrap();
w.set_index(blob_idx).unwrap();
w.set_index(blob_idx);
blob_idx += 1;
w.set_id(&leader_id).unwrap();
w.set_id(&leader_id);
let serialized_entry = serialize(&entry).unwrap();

View File

@ -21,7 +21,7 @@ pub struct WindowSlot {
impl WindowSlot {
fn blob_index(&self) -> Option<u64> {
match self.data {
Some(ref blob) => blob.read().unwrap().index().ok(),
Some(ref blob) => Some(blob.read().unwrap().index()),
None => None,
}
}

View File

@ -86,7 +86,7 @@ fn recv_window(
for b in dq {
let (pix, meta_size) = {
let p = b.read().unwrap();
(p.index()?, p.meta.size)
(p.index(), p.meta.size)
};
trace!("{} window pix: {} size: {}", id, pix, meta_size);
@ -217,7 +217,6 @@ mod test {
use crate::entry::{make_consecutive_blobs, Entry};
use crate::leader_scheduler::LeaderScheduler;
use crate::packet::{SharedBlob, PACKET_DATA_SIZE};
use crate::streamer::{blob_receiver, responder};
use crate::window_service::{repair_backoff, window_service};
use solana_sdk::hash::Hash;
@ -350,18 +349,12 @@ mod test {
tn.sockets.tvu.into_iter().map(Arc::new).collect();
let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder);
let mut msgs = Vec::new();
let blobs =
make_consecutive_blobs(&me_id, 14u64, 0, Default::default(), &tn.info.gossip);
for v in 0..10 {
let i = 9 - v;
let b = SharedBlob::default();
{
let mut w = b.write().unwrap();
w.set_index(i).unwrap();
w.set_id(&me_id).unwrap();
assert_eq!(i, w.index().unwrap());
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&tn.info.gossip);
}
msgs.push(b);
msgs.push(blobs[i].clone());
}
s_responder.send(msgs).expect("send");
@ -369,16 +362,7 @@ mod test {
let mut msgs1 = Vec::new();
for v in 1..5 {
let i = 9 + v;
let b = SharedBlob::default();
{
let mut w = b.write().unwrap();
w.set_index(i).unwrap();
w.set_id(&me_id).unwrap();
assert_eq!(i, w.index().unwrap());
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&tn.info.gossip);
}
msgs1.push(b);
msgs1.push(blobs[i].clone());
}
s_responder.send(msgs1).expect("send");
t_responder

View File

@ -1764,10 +1764,9 @@ fn test_broadcast_last_tick() {
info!("Checking a node...");
let mut last_tick_blob: SharedBlob = SharedBlob::default();
while let Ok(new_blobs) = receiver.try_recv() {
let last_blob = new_blobs.into_iter().find(|b| {
b.read().unwrap().index().expect("Expected index in blob")
== last_tick_entry_height - 2
});
let last_blob = new_blobs
.into_iter()
.find(|b| b.read().unwrap().index() == last_tick_entry_height - 2);
if let Some(last_blob) = last_blob {
last_tick_blob = last_blob;
break;

View File

@ -163,7 +163,7 @@ fn test_replicator_startup() {
if let Ok(blobs) = x {
for b in blobs {
let br = b.read().unwrap();
assert!(br.index().unwrap() == repair_index);
assert!(br.index() == repair_index);
let entry: Entry = deserialize(&br.data()[..br.meta.size]).unwrap();
info!("entry: {:?}", entry);
assert_ne!(entry.id, Hash::default());