diff --git a/core/src/blockstream_service.rs b/core/src/blockstream_service.rs index 3fe607251d..6c7fcfe52a 100644 --- a/core/src/blockstream_service.rs +++ b/core/src/blockstream_service.rs @@ -161,7 +161,16 @@ mod test { let expected_tick_heights = [5, 6, 7, 8, 8, 9]; blocktree - .write_entries_using_shreds(1, 0, 0, ticks_per_slot, None, true, &entries) + .write_entries_using_shreds( + 1, + 0, + 0, + ticks_per_slot, + None, + true, + &Arc::new(Keypair::new()), + &entries, + ) .unwrap(); slot_full_sender.send((1, leader_pubkey)).unwrap(); diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 0a8512bf94..58d7a2496d 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -664,6 +664,43 @@ impl Blocktree { self.data_shred_cf.get_bytes((slot, index)) } + pub fn get_data_shreds( + &self, + slot: u64, + from_index: u64, + buffer: &mut [u8], + ) -> Result<(u64, usize)> { + let meta_cf = self.db.column::(); + let mut buffer_offset = 0; + let mut last_index = 0; + if let Some(meta) = meta_cf.get(slot)? { + if !meta.is_full() { + warn!("The slot is not yet full. Will not return any shreds"); + return Ok((last_index, buffer_offset)); + } + for index in from_index..meta.consumed { + if let Some(shred_data) = self.get_data_shred(slot, index)? { + let shred_len = shred_data.len(); + if buffer.len().saturating_sub(buffer_offset) >= shred_len { + buffer[buffer_offset..buffer_offset + shred_len] + .copy_from_slice(&shred_data[..shred_len]); + buffer_offset += shred_len; + last_index = index; + // All shreds are of the same length. + // Let's check if we have scope to accomodate another shred + // If not, let's break right away, as it'll save on 1 DB read + if buffer.len().saturating_sub(buffer_offset) < shred_len { + break; + } + } else { + break; + } + } + } + } + Ok((last_index, buffer_offset)) + } + /// Use this function to write data blobs to blocktree pub fn write_shared_blobs(&self, shared_blobs: I) -> Result<()> where @@ -752,6 +789,7 @@ impl Blocktree { ticks_per_slot: u64, parent: Option, is_full_slot: bool, + keypair: &Arc, entries: I, ) -> Result where @@ -774,7 +812,7 @@ impl Blocktree { current_slot, Some(parent_slot), 0.0, - &Arc::new(Keypair::new()), + keypair, start_index as u32, ) .expect("Failed to create entry shredder"); @@ -2608,6 +2646,7 @@ pub mod tests { ticks_per_slot, Some(i.saturating_sub(1)), true, + &Arc::new(Keypair::new()), new_ticks.clone(), ) .unwrap() as u64; diff --git a/core/src/blocktree_processor.rs b/core/src/blocktree_processor.rs index e1aa6f4bf0..d830b900bf 100644 --- a/core/src/blocktree_processor.rs +++ b/core/src/blocktree_processor.rs @@ -475,6 +475,7 @@ pub mod tests { ticks_per_slot, Some(parent_slot), true, + &Arc::new(Keypair::new()), &entries, ) .unwrap(); @@ -862,7 +863,16 @@ pub mod tests { let blocktree = Blocktree::open(&ledger_path).expect("Expected to successfully open database ledger"); blocktree - .write_entries_using_shreds(1, 0, 0, genesis_block.ticks_per_slot, None, true, &entries) + .write_entries_using_shreds( + 1, + 0, + 0, + genesis_block.ticks_per_slot, + None, + true, + &Arc::new(Keypair::new()), + &entries, + ) .unwrap(); let (bank_forks, bank_forks_info, _) = process_blocktree(&genesis_block, &blocktree, None, true, None).unwrap(); diff --git a/core/src/chacha.rs b/core/src/chacha.rs index 72c0516c84..44401771b6 100644 --- a/core/src/chacha.rs +++ b/core/src/chacha.rs @@ -12,7 +12,7 @@ pub const CHACHA_KEY_SIZE: usize = 32; pub fn chacha_cbc_encrypt_ledger( blocktree: &Arc, - slice: u64, + start_slot: u64, slots_per_segment: u64, out_path: &Path, ivec: &mut [u8; CHACHA_BLOCK_SIZE], @@ -23,26 +23,32 @@ pub fn chacha_cbc_encrypt_ledger( let mut buffer = [0; BUFFER_SIZE]; let mut encrypted_buffer = [0; BUFFER_SIZE]; let key = [0; CHACHA_KEY_SIZE]; - let mut total_entries = 0; let mut total_size = 0; - let mut entry = slice; - + let mut current_slot = start_slot; + let mut start_index = 0; loop { - match blocktree.read_blobs_bytes(0, slots_per_segment - total_entries, &mut buffer, entry) { - Ok((num_entries, entry_len)) => { + match blocktree.get_data_shreds(current_slot, start_index, &mut buffer) { + Ok((last_index, mut size)) => { debug!( - "chacha: encrypting slice: {} num_entries: {} entry_len: {}", - slice, num_entries, entry_len + "chacha: encrypting slice: {} num_shreds: {} data_len: {}", + current_slot, + last_index.saturating_sub(start_index), + size ); - debug!("read {} bytes", entry_len); - let mut size = entry_len as usize; + debug!("read {} bytes", size); + if size == 0 { - break; + if current_slot.saturating_sub(start_slot) < slots_per_segment { + current_slot += 1; + start_index = 0; + continue; + } else { + break; + } } if size < BUFFER_SIZE { - // We are on the last block, round to the nearest key_size - // boundary + // round to the nearest key_size boundary size = (size + CHACHA_KEY_SIZE - 1) & !(CHACHA_KEY_SIZE - 1); } total_size += size; @@ -53,8 +59,7 @@ pub fn chacha_cbc_encrypt_ledger( return Err(res); } - total_entries += num_entries; - entry += num_entries; + start_index = last_index + 1; } Err(e) => { info!("Error encrypting file: {:?}", e); @@ -117,9 +122,22 @@ mod tests { let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap()); let out_path = Path::new("test_chacha_encrypt_file_output.txt.enc"); + let seed = [2u8; 32]; + let mut rnd = GenKeys::new(seed); + let keypair = rnd.gen_keypair(); + let entries = make_tiny_deterministic_test_entries(slots_per_segment); blocktree - .write_entries_using_shreds(0, 0, 0, ticks_per_slot, None, true, &entries) + .write_entries_using_shreds( + 0, + 0, + 0, + ticks_per_slot, + None, + true, + &Arc::new(keypair), + &entries, + ) .unwrap(); let mut key = hex!( @@ -135,7 +153,7 @@ mod tests { hasher.hash(&buf[..size]); // golden needs to be updated if blob stuff changes.... - let golden: Hash = "GKot5hBsd81kMupNCXHaqbhv3huEbxAFMLnpcX2hniwn" + let golden: Hash = "EdYYuAuDPVY7DLNeCtPWAKipicx2KjsxqD2PZ7oxVmHE" .parse() .unwrap(); diff --git a/core/src/chacha_cuda.rs b/core/src/chacha_cuda.rs index 731955fd36..78906b8278 100644 --- a/core/src/chacha_cuda.rs +++ b/core/src/chacha_cuda.rs @@ -33,54 +33,62 @@ pub fn chacha_cbc_encrypt_file_many_keys( )); } - let mut buffer = [0; 8 * 1024]; + const BUFFER_SIZE: usize = 8 * 1024; + let mut buffer = [0; BUFFER_SIZE]; let num_keys = ivecs.len() / CHACHA_BLOCK_SIZE; let mut sha_states = vec![0; num_keys * size_of::()]; let mut int_sha_states = vec![0; num_keys * 112]; let keys: Vec = vec![0; num_keys * CHACHA_KEY_SIZE]; // keys not used ATM, uniqueness comes from IV - let mut entry = segment; - let mut total_entries = 0; - let mut total_entry_len = 0; + let mut current_slot = segment * slots_per_segment; + let mut start_index = 0; + let start_slot = current_slot; + let mut total_size = 0; let mut time: f32 = 0.0; unsafe { chacha_init_sha_state(int_sha_states.as_mut_ptr(), num_keys as u32); } loop { - match blocktree.read_blobs_bytes(entry, slots_per_segment - total_entries, &mut buffer, 0) { - Ok((num_entries, entry_len)) => { + match blocktree.get_data_shreds(current_slot, start_index, &mut buffer) { + Ok((last_index, mut size)) => { debug!( - "chacha_cuda: encrypting segment: {} num_entries: {} entry_len: {}", - segment, num_entries, entry_len + "chacha_cuda: encrypting segment: {} num_shreds: {} data_len: {}", + segment, + last_index.saturating_sub(start_index), + size ); - if num_entries == 0 { - break; + + if size == 0 { + if current_slot.saturating_sub(start_slot) < slots_per_segment { + current_slot += 1; + start_index = 0; + continue; + } else { + break; + } } - let entry_len_usz = entry_len as usize; + + if size < BUFFER_SIZE { + // round to the nearest key_size boundary + size = (size + CHACHA_KEY_SIZE - 1) & !(CHACHA_KEY_SIZE - 1); + } + unsafe { chacha_cbc_encrypt_many_sample( - buffer[..entry_len_usz].as_ptr(), + buffer[..size].as_ptr(), int_sha_states.as_mut_ptr(), - entry_len_usz, + size, keys.as_ptr(), ivecs.as_mut_ptr(), num_keys as u32, samples.as_ptr(), samples.len() as u32, - total_entry_len, + total_size, &mut time, ); } - total_entry_len += entry_len; - total_entries += num_entries; - entry += num_entries; - debug!( - "total entries: {} entry: {} segment: {} entries_per_segment: {}", - total_entries, entry, segment, slots_per_segment - ); - if (entry - segment) >= slots_per_segment { - break; - } + total_size += size as u64; + start_index = last_index + 1; } Err(e) => { info!("Error encrypting file: {:?}", e); @@ -113,6 +121,7 @@ mod tests { use crate::entry::make_tiny_test_entries; use crate::replicator::sample_file; use solana_sdk::hash::Hash; + use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::timing::DEFAULT_SLOTS_PER_SEGMENT; use std::fs::{remove_dir_all, remove_file}; use std::path::Path; @@ -130,7 +139,16 @@ mod tests { let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap()); blocktree - .write_entries(0, 0, 0, ticks_per_slot, &entries) + .write_entries_using_shreds( + 0, + 0, + 0, + ticks_per_slot, + Some(0), + true, + &Arc::new(Keypair::new()), + &entries, + ) .unwrap(); let out_path = Path::new("test_chacha_encrypt_file_many_keys_single_output.txt.enc"); @@ -178,7 +196,16 @@ mod tests { let ticks_per_slot = 16; let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap()); blocktree - .write_entries(0, 0, 0, ticks_per_slot, &entries) + .write_entries_using_shreds( + 0, + 0, + 0, + ticks_per_slot, + Some(0), + true, + &Arc::new(Keypair::new()), + &entries, + ) .unwrap(); let out_path = Path::new("test_chacha_encrypt_file_many_keys_multiple_output.txt.enc"); diff --git a/local_cluster/tests/replicator.rs b/local_cluster/tests/replicator.rs index a6addd107b..e1e5537e2f 100644 --- a/local_cluster/tests/replicator.rs +++ b/local_cluster/tests/replicator.rs @@ -74,13 +74,13 @@ fn run_replicator_startup_basic(num_nodes: usize, num_replicators: usize) { } #[test] -#[ignore] +#[serial] fn test_replicator_startup_1_node() { run_replicator_startup_basic(1, 1); } #[test] -#[ignore] +#[serial] fn test_replicator_startup_2_nodes() { run_replicator_startup_basic(2, 1); }