diff --git a/benches/db_ledger.rs b/benches/db_ledger.rs index 8a034bf630..ccced0fb5a 100644 --- a/benches/db_ledger.rs +++ b/benches/db_ledger.rs @@ -19,17 +19,17 @@ fn bench_write_blobs(bench: &mut Bencher, blobs: &mut Vec, ledger_path: &s bench.iter(move || { for blob in blobs.iter_mut() { - let index = blob.index().unwrap(); + let index = blob.index(); db_ledger .put_data_blob_bytes( - blob.slot().unwrap(), + blob.slot(), index, - &blob.data[..BLOB_HEADER_SIZE + blob.size().unwrap()], + &blob.data[..BLOB_HEADER_SIZE + blob.size()], ) .unwrap(); - blob.set_index(index + num_blobs as u64).unwrap(); + blob.set_index(index + num_blobs as u64); } }); @@ -50,8 +50,8 @@ fn setup_read_bench( // Convert the entries to blobs, write the blobs to the ledger let mut blobs = entries.to_blobs(); for (index, b) in blobs.iter_mut().enumerate() { - b.set_index(index as u64).unwrap(); - b.set_slot(slot).unwrap(); + b.set_index(index as u64); + b.set_slot(slot); } db_ledger .write_blobs(&blobs) @@ -67,7 +67,7 @@ fn bench_write_small(bench: &mut Bencher) { let entries = make_tiny_test_entries(num_entries); let mut blobs = entries.to_blobs(); for (index, b) in blobs.iter_mut().enumerate() { - b.set_index(index as u64).unwrap(); + b.set_index(index as u64); } bench_write_blobs(bench, &mut blobs, &ledger_path); } @@ -81,7 +81,7 @@ fn bench_write_big(bench: &mut Bencher) { let entries = make_large_test_entries(num_entries); let mut blobs = entries.to_blobs(); for (index, b) in blobs.iter_mut().enumerate() { - b.set_index(index as u64).unwrap(); + b.set_index(index as u64); } bench_write_blobs(bench, &mut blobs, &ledger_path); @@ -159,8 +159,8 @@ fn bench_insert_data_blob_small(bench: &mut Bencher) { bench.iter(move || { for blob in blobs.iter_mut() { - let index = blob.index().unwrap(); - blob.set_index(index + num_entries as u64).unwrap(); + let index = blob.index(); + blob.set_index(index + num_entries as u64); } db_ledger.write_blobs(&blobs).unwrap(); }); @@ -181,12 +181,9 @@ fn bench_insert_data_blob_big(bench: &mut Bencher) { bench.iter(move || { for blob in shared_blobs.iter_mut() { - let index = blob.read().unwrap().index().unwrap(); + let index = blob.read().unwrap().index(); db_ledger.write_shared_blobs(vec![blob.clone()]).unwrap(); - blob.write() - .unwrap() - .set_index(index + num_entries as u64) - .unwrap(); + blob.write().unwrap().set_index(index + num_entries as u64); } }); diff --git a/src/chacha.rs b/src/chacha.rs index 41b2ad46f9..e9ec7b0a38 100644 --- a/src/chacha.rs +++ b/src/chacha.rs @@ -163,12 +163,15 @@ mod tests { let size = out_file.read_to_end(&mut buf).unwrap(); let mut hasher = Hasher::default(); hasher.hash(&buf[..size]); - assert_eq!( - hasher.result(), - Hash::new(&hex!( - "16b1159b112b11d7a2fb7b0471797ab079bce7e0e86b8a879474616abb61e5aa" - )), + + use bs58; + // golden needs to be updated if blob stuff changes.... + let golden = Hash::new( + &bs58::decode("BES6jpfVwayNKq9YZbYjbZbyX3GLzFzeQJ7fksm6LifE") + .into_vec() + .unwrap(), ); + assert_eq!(hasher.result(), golden,); remove_file(out_path).unwrap(); } } diff --git a/src/cluster_info.rs b/src/cluster_info.rs index d65f329205..e56f5965e9 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -537,10 +537,7 @@ impl ClusterInfo { let s = obj.read().unwrap(); (s.my_data().clone(), peers) }; - blob.write() - .unwrap() - .set_id(&me.id) - .expect("set_id in pub fn retransmit"); + blob.write().unwrap().set_id(&me.id); let rblob = blob.read().unwrap(); trace!("retransmit orders {}", orders.len()); let errs: Vec<_> = orders @@ -549,7 +546,7 @@ impl ClusterInfo { debug!( "{}: retransmit blob {} to {} {}", me.id, - rblob.index().unwrap(), + rblob.index(), v.id, v.tvu, ); @@ -594,7 +591,7 @@ impl ClusterInfo { trace!( "{}: BROADCAST idx: {} sz: {} to {:?} coding: {}", id, - blob.index().unwrap(), + blob.index(), blob.meta.size, ids_and_tvus, blob.is_coding() @@ -1369,8 +1366,8 @@ mod tests { { let mut w_blob = blob.write().unwrap(); w_blob.set_size(data_size); - w_blob.set_index(1).expect("set_index()"); - w_blob.set_slot(2).expect("set_slot()"); + w_blob.set_index(1); + w_blob.set_slot(2); w_blob.meta.size = data_size + BLOB_HEADER_SIZE; } @@ -1382,8 +1379,8 @@ mod tests { ClusterInfo::run_window_request(&me, &socketaddr_any!(), Some(&db_ledger), &me, 1); assert!(!rv.is_empty()); let v = rv[0].clone(); - assert_eq!(v.read().unwrap().index().unwrap(), 1); - assert_eq!(v.read().unwrap().slot().unwrap(), 2); + assert_eq!(v.read().unwrap().index(), 1); + assert_eq!(v.read().unwrap().slot(), 2); assert_eq!(v.read().unwrap().meta.size, BLOB_HEADER_SIZE + data_size); } diff --git a/src/db_ledger.rs b/src/db_ledger.rs index d68954b057..0ddbaafde0 100644 --- a/src/db_ledger.rs +++ b/src/db_ledger.rs @@ -383,8 +383,8 @@ impl DbLedger { .enumerate() .map(|(idx, entry)| { let mut b = entry.borrow().to_blob(); - b.set_index(idx as u64 + index).unwrap(); - b.set_slot(slot).unwrap(); + b.set_index(idx as u64 + index); + b.set_slot(slot); b }) .collect(); @@ -403,12 +403,7 @@ impl DbLedger { return Ok(vec![]); } - new_blobs.sort_unstable_by(|b1, b2| { - b1.borrow() - .index() - .unwrap() - .cmp(&b2.borrow().index().unwrap()) - }); + new_blobs.sort_unstable_by(|b1, b2| b1.borrow().index().cmp(&b2.borrow().index())); let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT); @@ -425,10 +420,10 @@ impl DbLedger { // TODO: Handle if leader sends different blob for same index when the index > consumed // The old window implementation would just replace that index. - let lowest_index = new_blobs[0].borrow().index()?; - let lowest_slot = new_blobs[0].borrow().slot()?; - let highest_index = new_blobs.last().unwrap().borrow().index()?; - let highest_slot = new_blobs.last().unwrap().borrow().slot()?; + let lowest_index = new_blobs[0].borrow().index(); + let lowest_slot = new_blobs[0].borrow().slot(); + let highest_index = new_blobs.last().unwrap().borrow().index(); + let highest_slot = new_blobs.last().unwrap().borrow().slot(); if lowest_index < meta.consumed { return Err(Error::DbLedgerError(DbLedgerError::BlobForIndexExists)); } @@ -458,7 +453,7 @@ impl DbLedger { let mut found_blob = None; while index_into_blob < new_blobs.len() { let new_blob = new_blobs[index_into_blob].borrow(); - let index = new_blob.index()?; + let index = new_blob.index(); // Skip over duplicate blobs with the same index and continue // until we either find the index we're looking for, or detect @@ -477,9 +472,9 @@ impl DbLedger { // If we found the blob in the new_blobs vector, process it, otherwise, // look for the blob in the database. if let Some(next_blob) = found_blob { - current_slot = next_blob.slot()?; - let serialized_entry_data = &next_blob.data - [BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + next_blob.size()?]; + current_slot = next_blob.slot(); + let serialized_entry_data = + &next_blob.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + next_blob.size()]; // Verify entries can actually be reconstructed deserialize(serialized_entry_data).expect( "Blob made it past validation, so must be deserializable at this point", @@ -521,8 +516,8 @@ impl DbLedger { for blob in new_blobs { let blob = blob.borrow(); - let key = DataCf::key(blob.slot()?, blob.index()?); - let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()?]; + let key = DataCf::key(blob.slot(), blob.index()); + let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()]; batch.put_cf(self.data_cf.handle(), &key, serialized_blob_datas)?; } @@ -539,7 +534,7 @@ impl DbLedger { let mut meta = { if let Some(meta) = self.meta_cf.get(&meta_key)? { let first = blobs[0].read().unwrap(); - assert_eq!(meta.consumed, first.index()?); + assert_eq!(meta.consumed, first.index()); meta } else { SlotMeta::new() @@ -548,18 +543,18 @@ impl DbLedger { { let last = blobs.last().unwrap().read().unwrap(); - meta.consumed = last.index()? + 1; - meta.consumed_slot = last.slot()?; - meta.received = cmp::max(meta.received, last.index()? + 1); - meta.received_slot = cmp::max(meta.received_slot, last.index()?); + meta.consumed = last.index() + 1; + meta.consumed_slot = last.slot(); + meta.received = cmp::max(meta.received, last.index() + 1); + meta.received_slot = cmp::max(meta.received_slot, last.index()); } let mut batch = WriteBatch::default(); batch.put_cf(self.meta_cf.handle(), &meta_key, &serialize(&meta)?)?; for blob in blobs { let blob = blob.read().unwrap(); - let key = DataCf::key(blob.slot()?, blob.index()?); - let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()?]; + let key = DataCf::key(blob.slot(), blob.index()); + let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()]; batch.put_cf(self.data_cf.handle(), &key, serialized_blob_datas)?; } self.db.write(batch)?; @@ -661,8 +656,8 @@ impl DbLedger { let bytes = self.get_data_blob_bytes(slot, index)?; Ok(bytes.map(|bytes| { let blob = Blob::new(&bytes); - assert!(blob.slot().unwrap() == slot); - assert!(blob.index().unwrap() == index); + assert!(blob.slot() == slot); + assert!(blob.index() == index); blob })) } @@ -854,9 +849,9 @@ where .enumerate() .map(|(idx, entry)| { let mut b = entry.borrow().to_blob(); - b.set_index(idx as u64).unwrap(); - b.set_id(&keypair.pubkey()).unwrap(); - b.set_slot(DEFAULT_SLOT_HEIGHT).unwrap(); + b.set_index(idx as u64); + b.set_id(&keypair.pubkey()); + b.set_slot(DEFAULT_SLOT_HEIGHT); b }) .collect(); @@ -1052,7 +1047,7 @@ mod tests { let shared_blobs = entries.to_shared_blobs(); for (i, b) in shared_blobs.iter().enumerate() { - b.write().unwrap().set_index(i as u64).unwrap(); + b.write().unwrap().set_index(i as u64); } let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); @@ -1095,7 +1090,7 @@ mod tests { let entries = make_tiny_test_entries(num_blobs); let shared_blobs = entries.to_shared_blobs(); for (i, b) in shared_blobs.iter().enumerate() { - b.write().unwrap().set_index(i as u64).unwrap(); + b.write().unwrap().set_index(i as u64); } let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); @@ -1132,7 +1127,7 @@ mod tests { let entries = make_tiny_test_entries(num_blobs); let shared_blobs = entries.to_shared_blobs(); for (i, b) in shared_blobs.iter().enumerate() { - b.write().unwrap().set_index(i as u64).unwrap(); + b.write().unwrap().set_index(i as u64); } let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); @@ -1182,8 +1177,8 @@ mod tests { for (i, b) in shared_blobs.iter().enumerate() { let mut w_b = b.write().unwrap(); - w_b.set_index(1 << (i * 8)).unwrap(); - w_b.set_slot(DEFAULT_SLOT_HEIGHT).unwrap(); + w_b.set_index(1 << (i * 8)); + w_b.set_slot(DEFAULT_SLOT_HEIGHT); } assert_eq!( @@ -1224,8 +1219,8 @@ mod tests { let shared_blobs = original_entries.clone().to_shared_blobs(); for (i, b) in shared_blobs.iter().enumerate() { let mut w_b = b.write().unwrap(); - w_b.set_index(i as u64).unwrap(); - w_b.set_slot(i as u64).unwrap(); + w_b.set_index(i as u64); + w_b.set_slot(i as u64); } assert_eq!( @@ -1271,8 +1266,8 @@ mod tests { for (i, b) in shared_blobs.iter().enumerate() { let index = (i / 2) as u64; let mut w_b = b.write().unwrap(); - w_b.set_index(index).unwrap(); - w_b.set_slot(index).unwrap(); + w_b.set_index(index); + w_b.set_slot(index); } assert_eq!( @@ -1321,8 +1316,8 @@ mod tests { let shared_blobs = original_entries.to_shared_blobs(); for (i, b) in shared_blobs.iter().enumerate() { let mut w_b = b.write().unwrap(); - w_b.set_index(i as u64).unwrap(); - w_b.set_slot(i as u64).unwrap(); + w_b.set_index(i as u64); + w_b.set_slot(i as u64); } db_ledger @@ -1338,8 +1333,8 @@ mod tests { for (i, b) in shared_blobs.iter().enumerate() { let mut w_b = b.write().unwrap(); - w_b.set_index(num_entries + i as u64).unwrap(); - w_b.set_slot(num_entries + i as u64).unwrap(); + w_b.set_index(num_entries + i as u64); + w_b.set_slot(num_entries + i as u64); } db_ledger diff --git a/src/db_window.rs b/src/db_window.rs index e2caed6bfd..3e4b012df9 100644 --- a/src/db_window.rs +++ b/src/db_window.rs @@ -125,10 +125,9 @@ pub fn retransmit_all_leader_blocks( for b in dq { // Check if the blob is from the scheduled leader for its slot. If so, // add to the retransmit_queue - if let Ok(slot) = b.read().unwrap().slot() { - if let Some(leader_id) = leader_scheduler.read().unwrap().get_leader_for_slot(slot) { - add_blob_to_retransmit_queue(b, leader_id, &mut retransmit_queue); - } + let slot = b.read().unwrap().slot(); + if let Some(leader_id) = leader_scheduler.read().unwrap().get_leader_for_slot(slot) { + add_blob_to_retransmit_queue(b, leader_id, &mut retransmit_queue); } } @@ -154,7 +153,7 @@ pub fn add_blob_to_retransmit_queue( retransmit_queue: &mut Vec, ) { let p = b.read().unwrap(); - if p.id().expect("get_id in fn add_block_to_retransmit_queue") == leader_id { + if p.id() == leader_id { let nv = SharedBlob::default(); { let mut mnv = nv.write().unwrap(); @@ -186,7 +185,7 @@ pub fn process_blob( // Github issue: https://github.com/solana-labs/solana/issues/1899. let (slot, pix) = { let r_blob = blob.read().unwrap(); - (r_blob.slot()?, r_blob.index()?) + (r_blob.slot(), r_blob.index()) }; let leader = leader_scheduler.read().unwrap().get_leader_for_slot(slot); @@ -199,11 +198,7 @@ pub fn process_blob( // Insert the new blob into the window let mut consumed_entries = if is_coding { let blob = &blob.read().unwrap(); - db_ledger.put_coding_blob_bytes( - slot, - pix, - &blob.data[..BLOB_HEADER_SIZE + blob.size().unwrap()], - )?; + db_ledger.put_coding_blob_bytes(slot, pix, &blob.data[..BLOB_HEADER_SIZE + blob.size()])?; vec![] } else { db_ledger.insert_data_blobs(vec![(*blob.read().unwrap()).borrow()])? @@ -280,8 +275,8 @@ fn try_erasure(db_ledger: &Arc, consume_queue: &mut Vec) -> Res let c = c.read().unwrap(); db_ledger.put_coding_blob_bytes( meta.consumed_slot, - c.index().unwrap(), - &c.data[..BLOB_HEADER_SIZE + c.size().unwrap()], + c.index(), + &c.data[..BLOB_HEADER_SIZE + c.size()], )?; } @@ -402,7 +397,7 @@ mod test { let (blob_sender, blob_receiver) = channel(); // Expect blob from leader to be retransmitted - blob.write().unwrap().set_id(&leader).unwrap(); + blob.write().unwrap().set_id(&leader); retransmit_all_leader_blocks(&vec![blob.clone()], &leader_scheduler, &blob_sender) .expect("Expect successful retransmit"); let output_blob = blob_receiver @@ -412,11 +407,11 @@ mod test { // Retransmitted blob should be missing the leader id assert_ne!(*output_blob[0].read().unwrap(), *blob.read().unwrap()); // Set the leader in the retransmitted blob, should now match the original - output_blob[0].write().unwrap().set_id(&leader).unwrap(); + output_blob[0].write().unwrap().set_id(&leader); assert_eq!(*output_blob[0].read().unwrap(), *blob.read().unwrap()); // Expect blob from nonleader to not be retransmitted - blob.write().unwrap().set_id(&nonleader).unwrap(); + blob.write().unwrap().set_id(&nonleader); retransmit_all_leader_blocks(&vec![blob], &leader_scheduler, &blob_sender) .expect("Expect successful retransmit"); assert!(blob_receiver.try_recv().is_err()); @@ -441,8 +436,8 @@ mod test { const ONE: u64 = 1; const OTHER: u64 = 4; - blobs[0].set_index(ONE).unwrap(); - blobs[1].set_index(OTHER).unwrap(); + blobs[0].set_index(ONE); + blobs[1].set_index(OTHER); // Insert one blob at index = first_index db_ledger.write_blobs(&blobs).unwrap(); @@ -479,8 +474,8 @@ mod test { let num_entries = 10; let mut blobs = make_tiny_test_entries(num_entries).to_blobs(); for (i, b) in blobs.iter_mut().enumerate() { - b.set_index(i as u64 * gap).unwrap(); - b.set_slot(slot).unwrap(); + b.set_index(i as u64 * gap); + b.set_slot(slot); } db_ledger.write_blobs(&blobs).unwrap(); @@ -635,7 +630,7 @@ mod test { .get_coding_blob_bytes(slot_height, erased_index as u64) .unwrap() .unwrap()[BLOB_HEADER_SIZE..], - &erased_coding_l.data()[..erased_coding_l.size().unwrap() as usize], + &erased_coding_l.data()[..erased_coding_l.size() as usize], ); } diff --git a/src/entry.rs b/src/entry.rs index 7ae5682f0c..ef6bda2397 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -247,7 +247,7 @@ where for blob in blobs.into_iter() { let entry: Entry = { - let msg_size = blob.borrow().size()?; + let msg_size = blob.borrow().size(); deserialize(&blob.borrow().data()[..msg_size])? }; @@ -459,8 +459,8 @@ pub fn make_consecutive_blobs( let mut index = start_height; for blob in &blobs { let mut blob = blob.write().unwrap(); - blob.set_index(index).unwrap(); - blob.set_id(id).unwrap(); + blob.set_index(index); + blob.set_id(id); blob.meta.set_addr(addr); index += 1; } diff --git a/src/erasure.rs b/src/erasure.rs index 04f67e0b44..62e8acb300 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -215,7 +215,7 @@ fn decode_blobs( let mut data_size; if n < NUM_DATA { - data_size = locks[n].data_size().unwrap() as usize; + data_size = locks[n].data_size() as usize; data_size -= BLOB_HEADER_SIZE; if data_size > BLOB_DATA_SIZE { error!("corrupt data blob[{}] data_size: {}", idx, data_size); @@ -225,8 +225,8 @@ fn decode_blobs( } else { data_size = size; idx -= NUM_CODING as u64; - locks[n].set_slot(slot).unwrap(); - locks[n].set_index(idx).unwrap(); + locks[n].set_slot(slot); + locks[n].set_index(idx); if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE { error!("corrupt coding blob[{}] data_size: {}", idx, data_size); @@ -327,18 +327,18 @@ impl CodingGenerator { let mut coding_blobs = Vec::with_capacity(NUM_CODING); for data_blob in &data_locks[NUM_DATA - NUM_CODING..NUM_DATA] { - let index = data_blob.index().unwrap(); - let slot = data_blob.slot().unwrap(); - let id = data_blob.id().unwrap(); + let index = data_blob.index(); + let slot = data_blob.slot(); + let id = data_blob.id(); let coding_blob = SharedBlob::default(); { let mut coding_blob = coding_blob.write().unwrap(); - coding_blob.set_index(index).unwrap(); - coding_blob.set_slot(slot).unwrap(); - coding_blob.set_id(&id).unwrap(); + coding_blob.set_index(index); + coding_blob.set_slot(slot); + coding_blob.set_id(&id); coding_blob.set_size(max_data_size); - coding_blob.set_coding().unwrap(); + coding_blob.set_coding(); } coding_blobs.push(coding_blob); } @@ -583,7 +583,7 @@ pub mod test { assert_eq!(i % NUM_DATA, NUM_DATA - 1); assert_eq!(coding.len(), NUM_CODING); - let size = coding[0].read().unwrap().size().unwrap(); + let size = coding[0].read().unwrap().size(); // toss one data and one coding let erasures: Vec = vec![0, NUM_DATA as i32, -1]; @@ -649,9 +649,9 @@ pub mod test { let data = data.read().unwrap(); db_ledger .put_data_blob_bytes( - data.slot().unwrap(), - data.index().unwrap(), - &data.data[..data.data_size().unwrap() as usize], + data.slot(), + data.index(), + &data.data[..data.data_size() as usize], ) .expect("Expected successful put into data column of ledger"); } else { @@ -664,17 +664,13 @@ pub mod test { if let Some(ref coding) = slot.coding { let coding_lock = coding.read().unwrap(); - let index = coding_lock - .index() - .expect("Expected coding blob to have valid index"); + let index = coding_lock.index(); - let data_size = coding_lock - .size() - .expect("Expected coding blob to have valid data size"); + let data_size = coding_lock.size(); db_ledger .put_coding_blob_bytes( - coding_lock.slot().unwrap(), + coding_lock.slot(), index, &coding_lock.data[..data_size as usize + BLOB_HEADER_SIZE], ) @@ -764,9 +760,9 @@ pub mod test { if let Some(data) = &window[n].data { let data_rl = data.read().unwrap(); - let index = data_rl.index().unwrap(); - let slot = data_rl.slot().unwrap(); - let id = data_rl.id().unwrap(); + let index = data_rl.index(); + let slot = data_rl.slot(); + let id = data_rl.id(); trace!( "{} copying index {} id {:?} from data to coding", @@ -774,14 +770,12 @@ pub mod test { index, id ); - coding_wl.set_index(index).unwrap(); - coding_wl.set_slot(slot).unwrap(); - coding_wl.set_id(&id).unwrap(); + coding_wl.set_index(index); + coding_wl.set_slot(slot); + coding_wl.set_id(&id); } coding_wl.set_size(max_data_size); - if coding_wl.set_coding().is_err() { - return Err(Error::ErasureError(ErasureError::EncodeError)); - } + coding_wl.set_coding(); coding_blobs.push(coding.clone()); } @@ -904,7 +898,7 @@ pub mod test { ); for b in blobs { - let idx = b.read().unwrap().index().unwrap() as usize % WINDOW_SIZE; + let idx = b.read().unwrap().index() as usize % WINDOW_SIZE; window[idx].data = Some(b); } @@ -936,7 +930,7 @@ pub mod test { let blobs = generate_test_blobs(offset, num_blobs); for b in blobs.into_iter() { - let idx = b.read().unwrap().index().unwrap() as usize % WINDOW_SIZE; + let idx = b.read().unwrap().index() as usize % WINDOW_SIZE; window[idx].data = Some(b); } @@ -994,13 +988,13 @@ pub mod test { let ref_l2 = ref_l.read().unwrap(); let result = recovered_blob.read().unwrap(); - assert_eq!(result.size().unwrap(), ref_l2.size().unwrap()); + assert_eq!(result.size(), ref_l2.size()); assert_eq!( - result.data[..ref_l2.data_size().unwrap() as usize], - ref_l2.data[..ref_l2.data_size().unwrap() as usize] + result.data[..ref_l2.data_size() as usize], + ref_l2.data[..ref_l2.data_size() as usize] ); - assert_eq!(result.index().unwrap(), offset as u64); - assert_eq!(result.slot().unwrap(), DEFAULT_SLOT_HEIGHT as u64); + assert_eq!(result.index(), offset as u64); + assert_eq!(result.slot(), DEFAULT_SLOT_HEIGHT as u64); } drop(db_ledger); DbLedger::destroy(&ledger_path) @@ -1047,26 +1041,26 @@ pub mod test { let ref_l2 = ref_l.read().unwrap(); let result = recovered_data_blob.read().unwrap(); - assert_eq!(result.size().unwrap(), ref_l2.size().unwrap()); + assert_eq!(result.size(), ref_l2.size()); assert_eq!( - result.data[..ref_l2.data_size().unwrap() as usize], - ref_l2.data[..ref_l2.data_size().unwrap() as usize] + result.data[..ref_l2.data_size() as usize], + ref_l2.data[..ref_l2.data_size() as usize] ); - assert_eq!(result.index().unwrap(), coding_start as u64); - assert_eq!(result.slot().unwrap(), DEFAULT_SLOT_HEIGHT as u64); + assert_eq!(result.index(), coding_start as u64); + assert_eq!(result.slot(), DEFAULT_SLOT_HEIGHT as u64); // Check the recovered erasure result let ref_l = refwindowcoding.clone().unwrap(); let ref_l2 = ref_l.read().unwrap(); let result = recovered_coding_blob.read().unwrap(); - assert_eq!(result.size().unwrap(), ref_l2.size().unwrap()); + assert_eq!(result.size(), ref_l2.size()); assert_eq!( - result.data()[..ref_l2.size().unwrap() as usize], - ref_l2.data()[..ref_l2.size().unwrap() as usize] + result.data()[..ref_l2.size() as usize], + ref_l2.data()[..ref_l2.size() as usize] ); - assert_eq!(result.index().unwrap(), coding_start as u64); - assert_eq!(result.slot().unwrap(), DEFAULT_SLOT_HEIGHT as u64); + assert_eq!(result.index(), coding_start as u64); + assert_eq!(result.slot(), DEFAULT_SLOT_HEIGHT as u64); } drop(db_ledger); DbLedger::destroy(&ledger_path) diff --git a/src/packet.rs b/src/packet.rs index 32e5419727..eb9596d149 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -2,8 +2,8 @@ use crate::counter::Counter; use crate::recvmmsg::{recv_mmsg, NUM_RCVMMSGS}; use crate::result::{Error, Result}; -use bincode::{deserialize, serialize, serialize_into}; -use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; +use bincode::{serialize, serialize_into}; +use byteorder::{ByteOrder, LittleEndian}; use log::Level; use serde::Serialize; pub use solana_sdk::packet::PACKET_DATA_SIZE; @@ -272,20 +272,28 @@ pub fn to_shared_blobs(rsps: Vec<(T, SocketAddr)>) -> Result(); -const BLOB_INDEX_END: usize = BLOB_SLOT_END + size_of::(); -const BLOB_ID_END: usize = BLOB_INDEX_END + size_of::(); -const BLOB_FLAGS_END: usize = BLOB_ID_END + size_of::(); -const BLOB_SIZE_END: usize = BLOB_FLAGS_END + size_of::(); - -macro_rules! align { - ($x:expr, $align:expr) => { - $x + ($align - 1) & !($align - 1) +macro_rules! range { + ($prev:expr, $type:ident) => { + $prev..$prev + size_of::<$type>() }; } +const PARENT_RANGE: std::ops::Range = range!(0, u64); +const SLOT_RANGE: std::ops::Range = range!(PARENT_RANGE.end, u64); +const INDEX_RANGE: std::ops::Range = range!(SLOT_RANGE.end, u64); +const ID_RANGE: std::ops::Range = range!(INDEX_RANGE.end, Pubkey); +const FLAGS_RANGE: std::ops::Range = range!(ID_RANGE.end, u32); +const SIZE_RANGE: std::ops::Range = range!(FLAGS_RANGE.end, u64); + +macro_rules! align { + ($x:expr, $align:expr) => { + $x + ($align * 8 - 1) & !($align * 8 - 1) + }; +} + +pub const BLOB_HEADER_SIZE: usize = align!(SIZE_RANGE.end, 8); + pub const BLOB_FLAG_IS_CODING: u32 = 0x1; -pub const BLOB_HEADER_SIZE: usize = align!(BLOB_SIZE_END, 64); impl Blob { pub fn new(data: &[u8]) -> Self { @@ -293,78 +301,61 @@ impl Blob { let data_len = cmp::min(data.len(), blob.data.len()); let bytes = &data[..data_len]; blob.data[..data_len].copy_from_slice(bytes); - blob.meta.size = blob.data_size().expect("Expected valid data size") as usize; + blob.meta.size = blob.data_size() as usize; blob } - pub fn slot(&self) -> Result { - let mut rdr = io::Cursor::new(&self.data[0..BLOB_SLOT_END]); - let r = rdr.read_u64::()?; - Ok(r) + pub fn parent(&self) -> u64 { + LittleEndian::read_u64(&self.data[PARENT_RANGE]) } - pub fn set_slot(&mut self, ix: u64) -> Result<()> { - let mut wtr = vec![]; - wtr.write_u64::(ix)?; - self.data[..BLOB_SLOT_END].clone_from_slice(&wtr); - Ok(()) + pub fn set_parent(&mut self, ix: u64) { + LittleEndian::write_u64(&mut self.data[PARENT_RANGE], ix); } - pub fn index(&self) -> Result { - let mut rdr = io::Cursor::new(&self.data[BLOB_SLOT_END..BLOB_INDEX_END]); - let r = rdr.read_u64::()?; - Ok(r) + pub fn slot(&self) -> u64 { + LittleEndian::read_u64(&self.data[SLOT_RANGE]) } - pub fn set_index(&mut self, ix: u64) -> Result<()> { - let mut wtr = vec![]; - wtr.write_u64::(ix)?; - self.data[BLOB_SLOT_END..BLOB_INDEX_END].clone_from_slice(&wtr); - Ok(()) + pub fn set_slot(&mut self, ix: u64) { + LittleEndian::write_u64(&mut self.data[SLOT_RANGE], ix); } + pub fn index(&self) -> u64 { + LittleEndian::read_u64(&self.data[INDEX_RANGE]) + } + pub fn set_index(&mut self, ix: u64) { + LittleEndian::write_u64(&mut self.data[INDEX_RANGE], ix); + } + /// sender id, we use this for identifying if its a blob from the leader that we should /// retransmit. eventually blobs should have a signature that we can use for spam filtering - pub fn id(&self) -> Result { - let e = deserialize(&self.data[BLOB_INDEX_END..BLOB_ID_END])?; - Ok(e) + pub fn id(&self) -> Pubkey { + Pubkey::new(&self.data[ID_RANGE]) } - pub fn set_id(&mut self, id: &Pubkey) -> Result<()> { - let wtr = serialize(id)?; - self.data[BLOB_INDEX_END..BLOB_ID_END].clone_from_slice(&wtr); - Ok(()) + pub fn set_id(&mut self, id: &Pubkey) { + self.data[ID_RANGE].copy_from_slice(id.as_ref()) } - pub fn flags(&self) -> Result { - let mut rdr = io::Cursor::new(&self.data[BLOB_ID_END..BLOB_FLAGS_END]); - let r = rdr.read_u32::()?; - Ok(r) + pub fn flags(&self) -> u32 { + LittleEndian::read_u32(&self.data[FLAGS_RANGE]) } - - pub fn set_flags(&mut self, ix: u32) -> Result<()> { - let mut wtr = vec![]; - wtr.write_u32::(ix)?; - self.data[BLOB_ID_END..BLOB_FLAGS_END].clone_from_slice(&wtr); - Ok(()) + pub fn set_flags(&mut self, ix: u32) { + LittleEndian::write_u32(&mut self.data[FLAGS_RANGE], ix); } pub fn is_coding(&self) -> bool { - (self.flags().unwrap() & BLOB_FLAG_IS_CODING) != 0 + (self.flags() & BLOB_FLAG_IS_CODING) != 0 } - pub fn set_coding(&mut self) -> Result<()> { - let flags = self.flags().unwrap(); - self.set_flags(flags | BLOB_FLAG_IS_CODING) + pub fn set_coding(&mut self) { + let flags = self.flags(); + self.set_flags(flags | BLOB_FLAG_IS_CODING); } - pub fn data_size(&self) -> Result { - let mut rdr = io::Cursor::new(&self.data[BLOB_FLAGS_END..BLOB_SIZE_END]); - let r = rdr.read_u64::()?; - Ok(r) + pub fn data_size(&self) -> u64 { + LittleEndian::read_u64(&self.data[SIZE_RANGE]) } - pub fn set_data_size(&mut self, ix: u64) -> Result<()> { - let mut wtr = vec![]; - wtr.write_u64::(ix)?; - self.data[BLOB_FLAGS_END..BLOB_SIZE_END].clone_from_slice(&wtr); - Ok(()) + pub fn set_data_size(&mut self, ix: u64) { + LittleEndian::write_u64(&mut self.data[SIZE_RANGE], ix); } pub fn data(&self) -> &[u8] { @@ -373,20 +364,20 @@ impl Blob { pub fn data_mut(&mut self) -> &mut [u8] { &mut self.data[BLOB_HEADER_SIZE..] } - pub fn size(&self) -> Result { - let size = self.data_size()? as usize; - if size <= BLOB_HEADER_SIZE { - Err(Error::BlobError(BlobError::BadState)) - } else if self.meta.size == size { - Ok(size - BLOB_HEADER_SIZE) + pub fn size(&self) -> usize { + let size = self.data_size() as usize; + + if size > BLOB_HEADER_SIZE && size == self.meta.size { + size - BLOB_HEADER_SIZE } else { - Err(Error::BlobError(BlobError::BadState)) + 0 } } + pub fn set_size(&mut self, size: usize) { let new_size = size + BLOB_HEADER_SIZE; self.meta.size = new_size; - self.set_data_size(new_size as u64).unwrap(); + self.set_data_size(new_size as u64); } pub fn recv_blob(socket: &UdpSocket, r: &SharedBlob) -> io::Result<()> { @@ -456,9 +447,9 @@ pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, mut index: u64, slots: &[u for (blob, slot) in blobs.iter().zip(slots) { let mut blob = blob.write().unwrap(); - blob.set_index(index).expect("set_index"); - blob.set_slot(*slot).expect("set_slot"); - blob.set_id(id).expect("set_id"); + blob.set_index(index); + blob.set_slot(*slot); + blob.set_id(id); index += 1; } @@ -571,11 +562,11 @@ mod tests { #[test] pub fn blob_test() { let mut b = Blob::default(); - b.set_index(::max_value()).unwrap(); - assert_eq!(b.index().unwrap(), ::max_value()); + b.set_index(::max_value()); + assert_eq!(b.index(), ::max_value()); b.data_mut()[0] = 1; assert_eq!(b.data()[0], 1); - assert_eq!(b.index().unwrap(), ::max_value()); + assert_eq!(b.index(), ::max_value()); assert_eq!(b.meta, Meta::default()); } diff --git a/src/tvu.rs b/src/tvu.rs index 3a1cca9be1..cd77a6688d 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -338,9 +338,9 @@ pub mod tests { let b = SharedBlob::default(); { let mut w = b.write().unwrap(); - w.set_index(blob_idx).unwrap(); + w.set_index(blob_idx); blob_idx += 1; - w.set_id(&leader_id).unwrap(); + w.set_id(&leader_id); let serialized_entry = serialize(&entry).unwrap(); diff --git a/src/window.rs b/src/window.rs index 36babc4ebe..541ac7fa0b 100644 --- a/src/window.rs +++ b/src/window.rs @@ -21,7 +21,7 @@ pub struct WindowSlot { impl WindowSlot { fn blob_index(&self) -> Option { match self.data { - Some(ref blob) => blob.read().unwrap().index().ok(), + Some(ref blob) => Some(blob.read().unwrap().index()), None => None, } } diff --git a/src/window_service.rs b/src/window_service.rs index 94a125a6a5..1f6dc5e405 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -86,7 +86,7 @@ fn recv_window( for b in dq { let (pix, meta_size) = { let p = b.read().unwrap(); - (p.index()?, p.meta.size) + (p.index(), p.meta.size) }; trace!("{} window pix: {} size: {}", id, pix, meta_size); @@ -217,7 +217,6 @@ mod test { use crate::entry::{make_consecutive_blobs, Entry}; use crate::leader_scheduler::LeaderScheduler; - use crate::packet::{SharedBlob, PACKET_DATA_SIZE}; use crate::streamer::{blob_receiver, responder}; use crate::window_service::{repair_backoff, window_service}; use solana_sdk::hash::Hash; @@ -350,18 +349,12 @@ mod test { tn.sockets.tvu.into_iter().map(Arc::new).collect(); let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder); let mut msgs = Vec::new(); + let blobs = + make_consecutive_blobs(&me_id, 14u64, 0, Default::default(), &tn.info.gossip); + for v in 0..10 { let i = 9 - v; - let b = SharedBlob::default(); - { - let mut w = b.write().unwrap(); - w.set_index(i).unwrap(); - w.set_id(&me_id).unwrap(); - assert_eq!(i, w.index().unwrap()); - w.meta.size = PACKET_DATA_SIZE; - w.meta.set_addr(&tn.info.gossip); - } - msgs.push(b); + msgs.push(blobs[i].clone()); } s_responder.send(msgs).expect("send"); @@ -369,16 +362,7 @@ mod test { let mut msgs1 = Vec::new(); for v in 1..5 { let i = 9 + v; - let b = SharedBlob::default(); - { - let mut w = b.write().unwrap(); - w.set_index(i).unwrap(); - w.set_id(&me_id).unwrap(); - assert_eq!(i, w.index().unwrap()); - w.meta.size = PACKET_DATA_SIZE; - w.meta.set_addr(&tn.info.gossip); - } - msgs1.push(b); + msgs1.push(blobs[i].clone()); } s_responder.send(msgs1).expect("send"); t_responder diff --git a/tests/multinode.rs b/tests/multinode.rs index d17a58726d..a90817fd7c 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -1764,10 +1764,9 @@ fn test_broadcast_last_tick() { info!("Checking a node..."); let mut last_tick_blob: SharedBlob = SharedBlob::default(); while let Ok(new_blobs) = receiver.try_recv() { - let last_blob = new_blobs.into_iter().find(|b| { - b.read().unwrap().index().expect("Expected index in blob") - == last_tick_entry_height - 2 - }); + let last_blob = new_blobs + .into_iter() + .find(|b| b.read().unwrap().index() == last_tick_entry_height - 2); if let Some(last_blob) = last_blob { last_tick_blob = last_blob; break; diff --git a/tests/replicator.rs b/tests/replicator.rs index f726e08c66..dbba827bd9 100644 --- a/tests/replicator.rs +++ b/tests/replicator.rs @@ -163,7 +163,7 @@ fn test_replicator_startup() { if let Ok(blobs) = x { for b in blobs { let br = b.read().unwrap(); - assert!(br.index().unwrap() == repair_index); + assert!(br.index() == repair_index); let entry: Entry = deserialize(&br.data()[..br.meta.size]).unwrap(); info!("entry: {:?}", entry); assert_ne!(entry.id, Hash::default());