diff --git a/benches/db_ledger.rs b/benches/db_ledger.rs index 0dc35b99b7..1e55c543d4 100644 --- a/benches/db_ledger.rs +++ b/benches/db_ledger.rs @@ -5,26 +5,30 @@ extern crate test; use rand::seq::SliceRandom; use rand::{thread_rng, Rng}; -use solana::db_ledger::{DataCf, DbLedger, LedgerColumnFamilyRaw}; +use solana::db_ledger::DbLedger; use solana::ledger::{get_tmp_ledger_path, make_large_test_entries, make_tiny_test_entries, Block}; use solana::packet::{Blob, BLOB_HEADER_SIZE}; use test::Bencher; // Given some blobs and a ledger at ledger_path, benchmark writing the blobs to the ledger -fn bench_write_blobs(bench: &mut Bencher, blobs: &mut [&mut Blob], ledger_path: &str) { +fn bench_write_blobs(bench: &mut Bencher, blobs: &mut Vec, ledger_path: &str) { let db_ledger = DbLedger::open(&ledger_path).expect("Expected to be able to open database ledger"); - let slot = 0; + let num_blobs = blobs.len(); + bench.iter(move || { for blob in blobs.iter_mut() { let index = blob.index().unwrap(); - let key = DataCf::key(slot, index); - let size = blob.size().unwrap(); + db_ledger - .data_cf - .put(&key, &blob.data[..BLOB_HEADER_SIZE + size]) + .put_data_blob_bytes( + blob.slot().unwrap(), + index, + &blob.data[..BLOB_HEADER_SIZE + blob.size().unwrap()], + ) .unwrap(); + blob.set_index(index + num_blobs as u64).unwrap(); } }); @@ -44,12 +48,13 @@ fn setup_read_bench( entries.extend(make_tiny_test_entries(num_small_blobs as usize)); // Convert the entries to blobs, write the blobs to the ledger - let shared_blobs = entries.to_shared_blobs(); - for b in shared_blobs.iter() { - b.write().unwrap().set_slot(slot).unwrap(); + let mut blobs = entries.to_blobs(); + for (index, b) in blobs.iter_mut().enumerate() { + b.set_index(index as u64).unwrap(); + b.set_slot(slot).unwrap(); } db_ledger - .write_shared_blobs(&shared_blobs) + .write_blobs(&blobs) .expect("Expectd successful insertion of blobs into ledger"); } @@ -60,9 +65,10 @@ fn bench_write_small(bench: &mut Bencher) { let ledger_path = get_tmp_ledger_path("bench_write_small"); let num_entries = 32 * 1024; let entries = make_tiny_test_entries(num_entries); - let shared_blobs = entries.to_shared_blobs(); - let mut blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.write().unwrap()).collect(); - let mut blobs: Vec<&mut Blob> = blob_locks.iter_mut().map(|b| &mut **b).collect(); + let mut blobs = entries.to_blobs(); + for (index, b) in blobs.iter_mut().enumerate() { + b.set_index(index as u64).unwrap(); + } bench_write_blobs(bench, &mut blobs, &ledger_path); } @@ -72,10 +78,12 @@ fn bench_write_small(bench: &mut Bencher) { fn bench_write_big(bench: &mut Bencher) { let ledger_path = get_tmp_ledger_path("bench_write_big"); let num_entries = 32 * 1024; - let entries = make_tiny_test_entries(num_entries); - let shared_blobs = entries.to_shared_blobs(); - let mut blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.write().unwrap()).collect(); - let mut blobs: Vec<&mut Blob> = blob_locks.iter_mut().map(|b| &mut **b).collect(); + let entries = make_large_test_entries(num_entries); + let mut blobs = entries.to_blobs(); + for (index, b) in blobs.iter_mut().enumerate() { + b.set_index(index as u64).unwrap(); + } + bench_write_blobs(bench, &mut blobs, &ledger_path); } @@ -99,9 +107,7 @@ fn bench_read_sequential(bench: &mut Bencher) { // Generate random starting point in the range [0, total_blobs - 1], read num_reads blobs sequentially let start_index = rng.gen_range(0, num_small_blobs + num_large_blobs); for i in start_index..start_index + num_reads { - let _ = db_ledger - .data_cf - .get_by_slot_index(slot, i as u64 % total_blobs); + let _ = db_ledger.get_data_blob(slot, i as u64 % total_blobs); } }); @@ -132,7 +138,7 @@ fn bench_read_random(bench: &mut Bencher) { .collect(); bench.iter(move || { for i in indexes.iter() { - let _ = db_ledger.data_cf.get_by_slot_index(slot, *i as u64); + let _ = db_ledger.get_data_blob(slot, *i as u64); } }); @@ -147,18 +153,16 @@ fn bench_insert_data_blob_small(bench: &mut Bencher) { DbLedger::open(&ledger_path).expect("Expected to be able to open database ledger"); let num_entries = 32 * 1024; let entries = make_tiny_test_entries(num_entries); - let mut shared_blobs = entries.to_shared_blobs(); - shared_blobs.shuffle(&mut thread_rng()); + let mut blobs = entries.to_blobs(); + + blobs.shuffle(&mut thread_rng()); bench.iter(move || { - for blob in shared_blobs.iter_mut() { - let index = blob.read().unwrap().index().unwrap(); - db_ledger.write_shared_blobs(vec![blob.clone()]).unwrap(); - blob.write() - .unwrap() - .set_index(index + num_entries as u64) - .unwrap(); + for blob in blobs.iter_mut() { + let index = blob.index().unwrap(); + blob.set_index(index + num_entries as u64).unwrap(); } + db_ledger.write_blobs(&blobs).unwrap(); }); DbLedger::destroy(&ledger_path).expect("Expected successful database destruction"); diff --git a/src/broadcast_service.rs b/src/broadcast_service.rs index c3c43dec48..9a5ea01685 100644 --- a/src/broadcast_service.rs +++ b/src/broadcast_service.rs @@ -479,10 +479,7 @@ mod test { .unwrap() .get_scheduled_leader(start_tick_height + i + 1) .expect("Leader should exist"); - let result = db_ledger - .data_cf - .get_by_slot_index(slot, entry_height + i) - .unwrap(); + let result = db_ledger.get_data_blob(slot, entry_height + i).unwrap(); assert!(result.is_some()); } diff --git a/src/cluster_info.rs b/src/cluster_info.rs index 76eabbccd2..4b669ad45f 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -877,13 +877,10 @@ impl ClusterInfo { let max_slot = meta.received_slot; // Try to find the requested index in one of the slots for i in 0..=max_slot { - let get_result = db_ledger.data_cf.get_by_slot_index(i, ix); + let blob = db_ledger.get_data_blob(i, ix); - if let Ok(Some(blob_data)) = get_result { + if let Ok(Some(mut blob)) = blob { inc_new_counter_info!("cluster_info-window-request-ledger", 1); - let mut blob = Blob::new(&blob_data); - blob.set_index(ix).expect("set_index()"); - blob.set_id(&me.id).expect("set_id()"); // causes retransmission if I'm the leader blob.meta.set_addr(from_addr); return vec![Arc::new(RwLock::new(blob))]; diff --git a/src/db_ledger.rs b/src/db_ledger.rs index 2336a0638a..08b3142eba 100644 --- a/src/db_ledger.rs +++ b/src/db_ledger.rs @@ -12,7 +12,7 @@ use serde::de::DeserializeOwned; use serde::Serialize; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::borrow::Borrow; -use std::cmp::max; +use std::cmp; use std::fs::create_dir_all; use std::io; use std::path::Path; @@ -276,7 +276,7 @@ pub struct DbLedger { // Underlying database is automatically closed in the Drop implementation of DB db: Arc, meta_cf: MetaCf, - pub data_cf: DataCf, + data_cf: DataCf, pub erasure_cf: ErasureCf, } @@ -547,8 +547,8 @@ impl DbLedger { let last = blobs.last().unwrap().read().unwrap(); meta.consumed = last.index()? + 1; meta.consumed_slot = last.slot()?; - meta.received = max(meta.received, last.index()? + 1); - meta.received_slot = max(meta.received_slot, last.index()?); + meta.received = cmp::max(meta.received, last.index()? + 1); + meta.received_slot = cmp::max(meta.received_slot, last.index()?); } let mut batch = WriteBatch::default(); @@ -634,6 +634,33 @@ impl DbLedger { Ok(EntryIterator { db_iterator }) } + pub fn get_coding_blob_bytes(&self, slot: u64, index: u64) -> Result>> { + self.erasure_cf.get_by_slot_index(slot, index) + } + pub fn delete_coding_blob(&self, slot: u64, index: u64) -> Result<()> { + self.erasure_cf.delete_by_slot_index(slot, index) + } + pub fn get_data_blob_bytes(&self, slot: u64, index: u64) -> Result>> { + self.data_cf.get_by_slot_index(slot, index) + } + pub fn put_coding_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { + self.erasure_cf.put_by_slot_index(slot, index, bytes) + } + + pub fn put_data_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { + self.data_cf.put_by_slot_index(slot, index, bytes) + } + + pub fn get_data_blob(&self, slot: u64, index: u64) -> Result> { + let bytes = self.get_data_blob_bytes(slot, index)?; + Ok(bytes.map(|bytes| { + let blob = Blob::new(&bytes); + assert!(blob.slot().unwrap() == slot); + assert!(blob.index().unwrap() == index); + blob + })) + } + pub fn get_entries_bytes( &self, _start_index: u64, @@ -643,6 +670,99 @@ impl DbLedger { Err(io::Error::new(io::ErrorKind::Other, "TODO")) } + // Given a start and end entry index, find all the missing + // indexes in the ledger in the range [start_index, end_index) + fn find_missing_indexes( + db_iterator: &mut DbLedgerRawIterator, + slot: u64, + start_index: u64, + end_index: u64, + key: &dyn Fn(u64, u64) -> Vec, + index_from_key: &dyn Fn(&[u8]) -> Result, + max_missing: usize, + ) -> Vec { + if start_index >= end_index || max_missing == 0 { + return vec![]; + } + + let mut missing_indexes = vec![]; + + // Seek to the first blob with index >= start_index + db_iterator.seek(&key(slot, start_index)); + + // The index of the first missing blob in the slot + let mut prev_index = start_index; + 'outer: loop { + if !db_iterator.valid() { + for i in prev_index..end_index { + missing_indexes.push(i); + if missing_indexes.len() == max_missing { + break; + } + } + break; + } + let current_key = db_iterator.key().expect("Expect a valid key"); + let current_index = index_from_key(¤t_key) + .expect("Expect to be able to parse index from valid key"); + let upper_index = cmp::min(current_index, end_index); + for i in prev_index..upper_index { + missing_indexes.push(i); + if missing_indexes.len() == max_missing { + break 'outer; + } + } + if current_index >= end_index { + break; + } + + prev_index = current_index + 1; + db_iterator.next(); + } + + missing_indexes + } + + pub fn find_missing_data_indexes( + &self, + slot: u64, + start_index: u64, + end_index: u64, + max_missing: usize, + ) -> Vec { + let mut db_iterator = self.data_cf.raw_iterator(); + + Self::find_missing_indexes( + &mut db_iterator, + slot, + start_index, + end_index, + &DataCf::key, + &DataCf::index_from_key, + max_missing, + ) + } + + pub fn find_missing_coding_indexes( + &self, + slot: u64, + start_index: u64, + end_index: u64, + max_missing: usize, + ) -> Vec { + let mut db_iterator = self.erasure_cf.raw_iterator(); + + Self::find_missing_indexes( + &mut db_iterator, + slot, + start_index, + end_index, + &ErasureCf::key, + &ErasureCf::index_from_key, + max_missing, + ) + } + fn get_cf_options() -> Options { let mut options = Options::default(); options.set_max_write_buffer_number(32); diff --git a/src/db_window.rs b/src/db_window.rs index 0c8edcf292..c1cae642c5 100644 --- a/src/db_window.rs +++ b/src/db_window.rs @@ -82,9 +82,8 @@ pub fn repair( max_entry_height + 2 }; - let idxs = find_missing_data_indexes( + let idxs = db_ledger.find_missing_data_indexes( DEFAULT_SLOT_HEIGHT, - db_ledger, consumed, max_repair_entry_height - 1, MAX_REPAIR_LENGTH, @@ -117,99 +116,6 @@ pub fn repair( Ok(reqs) } -// Given a start and end entry index, find all the missing -// indexes in the ledger in the range [start_index, end_index) -pub fn find_missing_indexes( - db_iterator: &mut DbLedgerRawIterator, - slot: u64, - start_index: u64, - end_index: u64, - key: &dyn Fn(u64, u64) -> Vec, - index_from_key: &dyn Fn(&[u8]) -> Result, - max_missing: usize, -) -> Vec { - if start_index >= end_index || max_missing == 0 { - return vec![]; - } - - let mut missing_indexes = vec![]; - - // Seek to the first blob with index >= start_index - db_iterator.seek(&key(slot, start_index)); - - // The index of the first missing blob in the slot - let mut prev_index = start_index; - 'outer: loop { - if !db_iterator.valid() { - for i in prev_index..end_index { - missing_indexes.push(i); - if missing_indexes.len() == max_missing { - break; - } - } - break; - } - let current_key = db_iterator.key().expect("Expect a valid key"); - let current_index = - index_from_key(¤t_key).expect("Expect to be able to parse index from valid key"); - let upper_index = cmp::min(current_index, end_index); - for i in prev_index..upper_index { - missing_indexes.push(i); - if missing_indexes.len() == max_missing { - break 'outer; - } - } - if current_index >= end_index { - break; - } - - prev_index = current_index + 1; - db_iterator.next(); - } - - missing_indexes -} - -pub fn find_missing_data_indexes( - slot: u64, - db_ledger: &DbLedger, - start_index: u64, - end_index: u64, - max_missing: usize, -) -> Vec { - let mut db_iterator = db_ledger.data_cf.raw_iterator(); - - find_missing_indexes( - &mut db_iterator, - slot, - start_index, - end_index, - &DataCf::key, - &DataCf::index_from_key, - max_missing, - ) -} - -pub fn find_missing_coding_indexes( - slot: u64, - db_ledger: &DbLedger, - start_index: u64, - end_index: u64, - max_missing: usize, -) -> Vec { - let mut db_iterator = db_ledger.erasure_cf.raw_iterator(); - - find_missing_indexes( - &mut db_iterator, - slot, - start_index, - end_index, - &ErasureCf::key, - &ErasureCf::index_from_key, - max_missing, - ) -} - pub fn retransmit_all_leader_blocks( dq: &[SharedBlob], leader_scheduler: &Arc>, @@ -402,7 +308,6 @@ mod test { use crate::packet::{index_blobs, Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE}; use crate::streamer::{receiver, responder, PacketReceiver}; use solana_sdk::signature::{Keypair, KeypairUtil}; - use std::borrow::Borrow; use std::io; use std::io::Write; use std::net::UdpSocket; @@ -528,37 +433,35 @@ mod test { // Early exit conditions let empty: Vec = vec![]; - assert_eq!(find_missing_data_indexes(slot, &db_ledger, 0, 0, 1), empty); - assert_eq!(find_missing_data_indexes(slot, &db_ledger, 5, 5, 1), empty); - assert_eq!(find_missing_data_indexes(slot, &db_ledger, 4, 3, 1), empty); - assert_eq!(find_missing_data_indexes(slot, &db_ledger, 1, 2, 0), empty); + assert_eq!(db_ledger.find_missing_data_indexes(slot, 0, 0, 1), empty); + assert_eq!(db_ledger.find_missing_data_indexes(slot, 5, 5, 1), empty); + assert_eq!(db_ledger.find_missing_data_indexes(slot, 4, 3, 1), empty); + assert_eq!(db_ledger.find_missing_data_indexes(slot, 1, 2, 0), empty); - let shared_blob = &make_tiny_test_entries(1).to_shared_blobs()[0]; - let first_index = 10; - { - let mut bl = shared_blob.write().unwrap(); - bl.set_index(10).unwrap(); - bl.set_slot(slot).unwrap(); - } + let mut blobs = make_tiny_test_entries(2).to_blobs(); + + const ONE: u64 = 1; + const OTHER: u64 = 4; + + blobs[0].set_index(ONE).unwrap(); + blobs[1].set_index(OTHER).unwrap(); // Insert one blob at index = first_index - db_ledger - .write_blobs(&vec![(*shared_blob.read().unwrap()).borrow()]) - .unwrap(); + db_ledger.write_blobs(&blobs).unwrap(); + const STARTS: u64 = OTHER * 2; + const END: u64 = OTHER * 3; + const MAX: usize = 10; // The first blob has index = first_index. Thus, for i < first_index, // given the input range of [i, first_index], the missing indexes should be // [i, first_index - 1] - for i in 0..first_index { - let result = find_missing_data_indexes( - slot, - &db_ledger, - i, - first_index, - (first_index - i) as usize, + for start in 0..STARTS { + let result = db_ledger.find_missing_data_indexes( + slot, start, // start + END, //end + MAX, //max ); - let expected: Vec = (i..first_index).collect(); - + let expected: Vec = (start..END).filter(|i| *i != ONE && *i != OTHER).collect(); assert_eq!(result, expected); } @@ -589,27 +492,27 @@ mod test { // range of [0, gap) let expected: Vec = (1..gap).collect(); assert_eq!( - find_missing_data_indexes(slot, &db_ledger, 0, gap, gap as usize), + db_ledger.find_missing_data_indexes(slot, 0, gap, gap as usize), expected ); assert_eq!( - find_missing_data_indexes(slot, &db_ledger, 1, gap, (gap - 1) as usize), + db_ledger.find_missing_data_indexes(slot, 1, gap, (gap - 1) as usize), expected, ); assert_eq!( - find_missing_data_indexes(slot, &db_ledger, 0, gap - 1, (gap - 1) as usize), + db_ledger.find_missing_data_indexes(slot, 0, gap - 1, (gap - 1) as usize), &expected[..expected.len() - 1], ); assert_eq!( - find_missing_data_indexes(slot, &db_ledger, gap - 2, gap, gap as usize), + db_ledger.find_missing_data_indexes(slot, gap - 2, gap, gap as usize), vec![gap - 2, gap - 1], ); assert_eq!( - find_missing_data_indexes(slot, &db_ledger, gap - 2, gap, 1), + db_ledger.find_missing_data_indexes(slot, gap - 2, gap, 1), vec![gap - 2], ); assert_eq!( - find_missing_data_indexes(slot, &db_ledger, 0, gap, 1), + db_ledger.find_missing_data_indexes(slot, 0, gap, 1), vec![1], ); @@ -617,11 +520,11 @@ mod test { let mut expected: Vec = (1..gap).collect(); expected.push(gap + 1); assert_eq!( - find_missing_data_indexes(slot, &db_ledger, 0, gap + 2, (gap + 2) as usize), + db_ledger.find_missing_data_indexes(slot, 0, gap + 2, (gap + 2) as usize), expected, ); assert_eq!( - find_missing_data_indexes(slot, &db_ledger, 0, gap + 2, (gap - 1) as usize), + db_ledger.find_missing_data_indexes(slot, 0, gap + 2, (gap - 1) as usize), &expected[..expected.len() - 1], ); @@ -635,9 +538,8 @@ mod test { }) .collect(); assert_eq!( - find_missing_data_indexes( + db_ledger.find_missing_data_indexes( slot, - &db_ledger, j * gap, i * gap, ((i - j) * gap) as usize @@ -675,7 +577,7 @@ mod test { for i in 0..num_entries as u64 { for j in 0..i { assert_eq!( - find_missing_data_indexes(slot, &db_ledger, j, i, (i - j) as usize), + db_ledger.find_missing_data_indexes(slot, j, i, (i - j) as usize), empty ); } diff --git a/src/erasure.rs b/src/erasure.rs index fb423cf980..8ce5b22b6f 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -1,6 +1,5 @@ // Support erasure coding use crate::db_ledger::DbLedger; -use crate::db_window::{find_missing_coding_indexes, find_missing_data_indexes}; use crate::packet::{Blob, SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE, BLOB_SIZE}; use crate::result::{Error, Result}; use crate::window::WindowSlot; @@ -367,16 +366,12 @@ pub fn recover( block_end_idx ); - let data_missing = - find_missing_data_indexes(slot, &db_ledger, block_start_idx, block_end_idx, NUM_DATA).len(); - let coding_missing = find_missing_coding_indexes( - slot, - &db_ledger, - coding_start_idx, - block_end_idx, - NUM_CODING, - ) - .len(); + let data_missing = db_ledger + .find_missing_data_indexes(slot, block_start_idx, block_end_idx, NUM_DATA) + .len(); + let coding_missing = db_ledger + .find_missing_coding_indexes(slot, coding_start_idx, block_end_idx, NUM_CODING) + .len(); // if we're not missing data, or if we have too much missing but have enough coding if data_missing == 0 { @@ -410,7 +405,7 @@ pub fn recover( // Add the data blobs we have into the recovery vector, mark the missing ones for i in block_start_idx..block_end_idx { - let result = db_ledger.data_cf.get_by_slot_index(slot, i)?; + let result = db_ledger.get_data_blob_bytes(slot, i)?; categorize_blob( &result, @@ -628,13 +623,12 @@ pub mod test { // If we're using gibberish blobs, skip validation checks and insert // directly into the ledger if use_random { - let data_l = data.read().unwrap(); + let data = data.read().unwrap(); db_ledger - .data_cf - .put_by_slot_index( - data_l.slot().unwrap(), - data_l.index().unwrap(), - &data_l.data[..data_l.data_size().unwrap() as usize], + .put_data_blob_bytes( + data.slot().unwrap(), + data.index().unwrap(), + &data.data[..data.data_size().unwrap() as usize], ) .expect("Expected successful put into data column of ledger"); } else { diff --git a/src/lib.rs b/src/lib.rs index c614fbd75d..168afbf112 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -93,7 +93,7 @@ use solana_jsonrpc_http_server as jsonrpc_http_server; extern crate solana_jsonrpc_macros as jsonrpc_macros; use solana_jsonrpc_pubsub as jsonrpc_pubsub; use solana_jsonrpc_ws_server as jsonrpc_ws_server; -use solana_vote_signer; +//use solana_vote_signer; #[cfg(test)] #[macro_use]