Integrate shreds to the replicators (#5711)

* Integrate shreds to the replicators

* fix cuda stuff

* fix cuda tests
This commit is contained in:
Pankaj Garg 2019-08-28 22:34:47 -07:00 committed by GitHub
parent 5a5a6b3840
commit 8adac30c05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 151 additions and 48 deletions

View File

@ -161,7 +161,16 @@ mod test {
let expected_tick_heights = [5, 6, 7, 8, 8, 9];
blocktree
.write_entries_using_shreds(1, 0, 0, ticks_per_slot, None, true, &entries)
.write_entries_using_shreds(
1,
0,
0,
ticks_per_slot,
None,
true,
&Arc::new(Keypair::new()),
&entries,
)
.unwrap();
slot_full_sender.send((1, leader_pubkey)).unwrap();

View File

@ -664,6 +664,43 @@ impl Blocktree {
self.data_shred_cf.get_bytes((slot, index))
}
pub fn get_data_shreds(
&self,
slot: u64,
from_index: u64,
buffer: &mut [u8],
) -> Result<(u64, usize)> {
let meta_cf = self.db.column::<cf::SlotMeta>();
let mut buffer_offset = 0;
let mut last_index = 0;
if let Some(meta) = meta_cf.get(slot)? {
if !meta.is_full() {
warn!("The slot is not yet full. Will not return any shreds");
return Ok((last_index, buffer_offset));
}
for index in from_index..meta.consumed {
if let Some(shred_data) = self.get_data_shred(slot, index)? {
let shred_len = shred_data.len();
if buffer.len().saturating_sub(buffer_offset) >= shred_len {
buffer[buffer_offset..buffer_offset + shred_len]
.copy_from_slice(&shred_data[..shred_len]);
buffer_offset += shred_len;
last_index = index;
// All shreds are of the same length.
// Let's check if we have scope to accomodate another shred
// If not, let's break right away, as it'll save on 1 DB read
if buffer.len().saturating_sub(buffer_offset) < shred_len {
break;
}
} else {
break;
}
}
}
}
Ok((last_index, buffer_offset))
}
/// Use this function to write data blobs to blocktree
pub fn write_shared_blobs<I>(&self, shared_blobs: I) -> Result<()>
where
@ -752,6 +789,7 @@ impl Blocktree {
ticks_per_slot: u64,
parent: Option<u64>,
is_full_slot: bool,
keypair: &Arc<Keypair>,
entries: I,
) -> Result<usize>
where
@ -774,7 +812,7 @@ impl Blocktree {
current_slot,
Some(parent_slot),
0.0,
&Arc::new(Keypair::new()),
keypair,
start_index as u32,
)
.expect("Failed to create entry shredder");
@ -2608,6 +2646,7 @@ pub mod tests {
ticks_per_slot,
Some(i.saturating_sub(1)),
true,
&Arc::new(Keypair::new()),
new_ticks.clone(),
)
.unwrap() as u64;

View File

@ -475,6 +475,7 @@ pub mod tests {
ticks_per_slot,
Some(parent_slot),
true,
&Arc::new(Keypair::new()),
&entries,
)
.unwrap();
@ -862,7 +863,16 @@ pub mod tests {
let blocktree =
Blocktree::open(&ledger_path).expect("Expected to successfully open database ledger");
blocktree
.write_entries_using_shreds(1, 0, 0, genesis_block.ticks_per_slot, None, true, &entries)
.write_entries_using_shreds(
1,
0,
0,
genesis_block.ticks_per_slot,
None,
true,
&Arc::new(Keypair::new()),
&entries,
)
.unwrap();
let (bank_forks, bank_forks_info, _) =
process_blocktree(&genesis_block, &blocktree, None, true, None).unwrap();

View File

@ -12,7 +12,7 @@ pub const CHACHA_KEY_SIZE: usize = 32;
pub fn chacha_cbc_encrypt_ledger(
blocktree: &Arc<Blocktree>,
slice: u64,
start_slot: u64,
slots_per_segment: u64,
out_path: &Path,
ivec: &mut [u8; CHACHA_BLOCK_SIZE],
@ -23,26 +23,32 @@ pub fn chacha_cbc_encrypt_ledger(
let mut buffer = [0; BUFFER_SIZE];
let mut encrypted_buffer = [0; BUFFER_SIZE];
let key = [0; CHACHA_KEY_SIZE];
let mut total_entries = 0;
let mut total_size = 0;
let mut entry = slice;
let mut current_slot = start_slot;
let mut start_index = 0;
loop {
match blocktree.read_blobs_bytes(0, slots_per_segment - total_entries, &mut buffer, entry) {
Ok((num_entries, entry_len)) => {
match blocktree.get_data_shreds(current_slot, start_index, &mut buffer) {
Ok((last_index, mut size)) => {
debug!(
"chacha: encrypting slice: {} num_entries: {} entry_len: {}",
slice, num_entries, entry_len
"chacha: encrypting slice: {} num_shreds: {} data_len: {}",
current_slot,
last_index.saturating_sub(start_index),
size
);
debug!("read {} bytes", entry_len);
let mut size = entry_len as usize;
debug!("read {} bytes", size);
if size == 0 {
break;
if current_slot.saturating_sub(start_slot) < slots_per_segment {
current_slot += 1;
start_index = 0;
continue;
} else {
break;
}
}
if size < BUFFER_SIZE {
// We are on the last block, round to the nearest key_size
// boundary
// round to the nearest key_size boundary
size = (size + CHACHA_KEY_SIZE - 1) & !(CHACHA_KEY_SIZE - 1);
}
total_size += size;
@ -53,8 +59,7 @@ pub fn chacha_cbc_encrypt_ledger(
return Err(res);
}
total_entries += num_entries;
entry += num_entries;
start_index = last_index + 1;
}
Err(e) => {
info!("Error encrypting file: {:?}", e);
@ -117,9 +122,22 @@ mod tests {
let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap());
let out_path = Path::new("test_chacha_encrypt_file_output.txt.enc");
let seed = [2u8; 32];
let mut rnd = GenKeys::new(seed);
let keypair = rnd.gen_keypair();
let entries = make_tiny_deterministic_test_entries(slots_per_segment);
blocktree
.write_entries_using_shreds(0, 0, 0, ticks_per_slot, None, true, &entries)
.write_entries_using_shreds(
0,
0,
0,
ticks_per_slot,
None,
true,
&Arc::new(keypair),
&entries,
)
.unwrap();
let mut key = hex!(
@ -135,7 +153,7 @@ mod tests {
hasher.hash(&buf[..size]);
// golden needs to be updated if blob stuff changes....
let golden: Hash = "GKot5hBsd81kMupNCXHaqbhv3huEbxAFMLnpcX2hniwn"
let golden: Hash = "EdYYuAuDPVY7DLNeCtPWAKipicx2KjsxqD2PZ7oxVmHE"
.parse()
.unwrap();

View File

@ -33,54 +33,62 @@ pub fn chacha_cbc_encrypt_file_many_keys(
));
}
let mut buffer = [0; 8 * 1024];
const BUFFER_SIZE: usize = 8 * 1024;
let mut buffer = [0; BUFFER_SIZE];
let num_keys = ivecs.len() / CHACHA_BLOCK_SIZE;
let mut sha_states = vec![0; num_keys * size_of::<Hash>()];
let mut int_sha_states = vec![0; num_keys * 112];
let keys: Vec<u8> = vec![0; num_keys * CHACHA_KEY_SIZE]; // keys not used ATM, uniqueness comes from IV
let mut entry = segment;
let mut total_entries = 0;
let mut total_entry_len = 0;
let mut current_slot = segment * slots_per_segment;
let mut start_index = 0;
let start_slot = current_slot;
let mut total_size = 0;
let mut time: f32 = 0.0;
unsafe {
chacha_init_sha_state(int_sha_states.as_mut_ptr(), num_keys as u32);
}
loop {
match blocktree.read_blobs_bytes(entry, slots_per_segment - total_entries, &mut buffer, 0) {
Ok((num_entries, entry_len)) => {
match blocktree.get_data_shreds(current_slot, start_index, &mut buffer) {
Ok((last_index, mut size)) => {
debug!(
"chacha_cuda: encrypting segment: {} num_entries: {} entry_len: {}",
segment, num_entries, entry_len
"chacha_cuda: encrypting segment: {} num_shreds: {} data_len: {}",
segment,
last_index.saturating_sub(start_index),
size
);
if num_entries == 0 {
break;
if size == 0 {
if current_slot.saturating_sub(start_slot) < slots_per_segment {
current_slot += 1;
start_index = 0;
continue;
} else {
break;
}
}
let entry_len_usz = entry_len as usize;
if size < BUFFER_SIZE {
// round to the nearest key_size boundary
size = (size + CHACHA_KEY_SIZE - 1) & !(CHACHA_KEY_SIZE - 1);
}
unsafe {
chacha_cbc_encrypt_many_sample(
buffer[..entry_len_usz].as_ptr(),
buffer[..size].as_ptr(),
int_sha_states.as_mut_ptr(),
entry_len_usz,
size,
keys.as_ptr(),
ivecs.as_mut_ptr(),
num_keys as u32,
samples.as_ptr(),
samples.len() as u32,
total_entry_len,
total_size,
&mut time,
);
}
total_entry_len += entry_len;
total_entries += num_entries;
entry += num_entries;
debug!(
"total entries: {} entry: {} segment: {} entries_per_segment: {}",
total_entries, entry, segment, slots_per_segment
);
if (entry - segment) >= slots_per_segment {
break;
}
total_size += size as u64;
start_index = last_index + 1;
}
Err(e) => {
info!("Error encrypting file: {:?}", e);
@ -113,6 +121,7 @@ mod tests {
use crate::entry::make_tiny_test_entries;
use crate::replicator::sample_file;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::timing::DEFAULT_SLOTS_PER_SEGMENT;
use std::fs::{remove_dir_all, remove_file};
use std::path::Path;
@ -130,7 +139,16 @@ mod tests {
let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap());
blocktree
.write_entries(0, 0, 0, ticks_per_slot, &entries)
.write_entries_using_shreds(
0,
0,
0,
ticks_per_slot,
Some(0),
true,
&Arc::new(Keypair::new()),
&entries,
)
.unwrap();
let out_path = Path::new("test_chacha_encrypt_file_many_keys_single_output.txt.enc");
@ -178,7 +196,16 @@ mod tests {
let ticks_per_slot = 16;
let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap());
blocktree
.write_entries(0, 0, 0, ticks_per_slot, &entries)
.write_entries_using_shreds(
0,
0,
0,
ticks_per_slot,
Some(0),
true,
&Arc::new(Keypair::new()),
&entries,
)
.unwrap();
let out_path = Path::new("test_chacha_encrypt_file_many_keys_multiple_output.txt.enc");

View File

@ -74,13 +74,13 @@ fn run_replicator_startup_basic(num_nodes: usize, num_replicators: usize) {
}
#[test]
#[ignore]
#[serial]
fn test_replicator_startup_1_node() {
run_replicator_startup_basic(1, 1);
}
#[test]
#[ignore]
#[serial]
fn test_replicator_startup_2_nodes() {
run_replicator_startup_basic(2, 1);
}