Multiple entries per blob (#3337)

* Pack multiple entries into blob

* fix tests

* Add test for deserializing multi-entry blobs in blocktree

* more test fixes
This commit is contained in:
carllin 2019-03-17 18:48:23 -07:00 committed by GitHub
parent a35ebe1186
commit 60437a8dcb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 409 additions and 380 deletions

616
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -618,11 +618,11 @@ impl Blocktree {
{
blob_datas
.iter()
.map(|blob_data| {
let serialized_entry_data = &blob_data.borrow()[BLOB_HEADER_SIZE..];
let entry: Entry = deserialize(serialized_entry_data)
.flat_map(|blob_data| {
let serialized_entries_data = &blob_data.borrow()[BLOB_HEADER_SIZE..];
let entries: Vec<Entry> = deserialize(serialized_entries_data)
.expect("Ledger should only contain well formed data");
entry
entries
})
.collect()
}
@ -1041,7 +1041,7 @@ pub mod tests {
use crate::entry::{
create_ticks, make_tiny_test_entries, make_tiny_test_entries_from_hash, Entry, EntrySlice,
};
use crate::packet::index_blobs;
use crate::packet;
use rand::seq::SliceRandom;
use rand::thread_rng;
use solana_sdk::hash::Hash;
@ -1210,9 +1210,9 @@ pub mod tests {
#[test]
fn test_read_blobs_bytes() {
let shared_blobs = make_tiny_test_entries(10).to_shared_blobs();
let shared_blobs = make_tiny_test_entries(10).to_single_entry_shared_blobs();
let slot = 0;
index_blobs(&shared_blobs, &Keypair::new().pubkey(), 0, slot, 0);
packet::index_blobs(&shared_blobs, &Keypair::new().pubkey(), 0, slot, 0);
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
@ -1372,16 +1372,15 @@ pub mod tests {
// Write entries
let num_entries = 8;
let entries = make_tiny_test_entries(num_entries);
let shared_blobs = entries.to_shared_blobs();
let mut blobs = entries.to_single_entry_blobs();
for (i, b) in shared_blobs.iter().enumerate() {
let mut w_b = b.write().unwrap();
w_b.set_index(1 << (i * 8));
w_b.set_slot(0);
for (i, b) in blobs.iter_mut().enumerate() {
b.set_index(1 << (i * 8));
b.set_slot(0);
}
blocktree
.write_shared_blobs(&shared_blobs)
.write_blobs(&blobs)
.expect("Expected successful write of blobs");
let mut db_iterator = blocktree
@ -1410,7 +1409,7 @@ pub mod tests {
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let entries = make_tiny_test_entries(8);
let mut blobs = entries.clone().to_blobs();
let mut blobs = entries.clone().to_single_entry_blobs();
for (i, b) in blobs.iter_mut().enumerate() {
b.set_slot(1);
if i < 4 {
@ -1448,7 +1447,7 @@ pub mod tests {
for slot in 0..num_slots {
let entries = make_tiny_test_entries(slot as usize + 1);
let last_entry = entries.last().unwrap().clone();
let mut blobs = entries.clone().to_blobs();
let mut blobs = entries.clone().to_single_entry_blobs();
for b in blobs.iter_mut() {
b.set_index(index);
b.set_slot(slot as u64);
@ -1466,6 +1465,39 @@ pub mod tests {
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_get_slot_entries3() {
// Test inserting/fetching blobs which contain multiple entries per blob
let blocktree_path = get_tmp_ledger_path("test_get_slot_entries3");
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let num_slots = 5 as u64;
let blobs_per_slot = 5 as u64;
let entry_serialized_size =
bincode::serialized_size(&make_tiny_test_entries(1)).unwrap();
let entries_per_slot =
(blobs_per_slot * packet::BLOB_DATA_SIZE as u64) / entry_serialized_size;
// Write entries
for slot in 0..num_slots {
let mut index = 0;
let entries = make_tiny_test_entries(entries_per_slot as usize);
let mut blobs = entries.clone().to_blobs();
assert_eq!(blobs.len() as u64, blobs_per_slot);
for b in blobs.iter_mut() {
b.set_index(index);
b.set_slot(slot as u64);
index += 1;
}
blocktree
.write_blobs(&blobs)
.expect("Expected successful write of blobs");
assert_eq!(blocktree.get_slot_entries(slot, 0, None).unwrap(), entries,);
}
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_insert_data_blobs_consecutive() {
let blocktree_path = get_tmp_ledger_path("test_insert_data_blobs_consecutive");
@ -2105,7 +2137,7 @@ pub mod tests {
parent_slot: u64,
is_full_slot: bool,
) -> Vec<Blob> {
let mut blobs = entries.clone().to_blobs();
let mut blobs = entries.clone().to_single_entry_blobs();
for (i, b) in blobs.iter_mut().enumerate() {
b.set_index(i as u64);
b.set_slot(slot);

View File

@ -1,4 +1,4 @@
use crate::entry::Entry;
use crate::entry::{Entry, EntrySlice};
use crate::packet::{Blob, BLOB_HEADER_SIZE};
use crate::result::{Error, Result};
@ -13,6 +13,7 @@ use rocksdb::{
use solana_sdk::hash::Hash;
use std::collections::VecDeque;
use std::fs;
use std::io;
use std::path::Path;
@ -64,6 +65,7 @@ pub struct EntryIterator {
// you have to hold the database open in order to iterate over it, and in order
// for db_iterator to be able to run Drop
// _blocktree: Blocktree,
entries: VecDeque<Entry>,
}
impl Blocktree {
@ -127,6 +129,7 @@ impl Blocktree {
db_iterator.seek_to_first();
Ok(EntryIterator {
entries: VecDeque::new(),
db_iterator,
blockhash: None,
})
@ -378,15 +381,24 @@ impl Iterator for EntryIterator {
type Item = Entry;
fn next(&mut self) -> Option<Entry> {
if !self.entries.is_empty() {
return Some(self.entries.pop_front().unwrap());
}
if self.db_iterator.valid() {
if let Some(value) = self.db_iterator.value() {
if let Ok(entry) = deserialize::<Entry>(&value[BLOB_HEADER_SIZE..]) {
if let Ok(next_entries) = deserialize::<Vec<Entry>>(&value[BLOB_HEADER_SIZE..]) {
if let Some(blockhash) = self.blockhash {
if !entry.verify(&blockhash) {
if !next_entries.verify(&blockhash) {
return None;
}
}
self.db_iterator.next();
if next_entries.is_empty() {
return None;
}
self.entries = VecDeque::from(next_entries);
let entry = self.entries.pop_front().unwrap();
self.blockhash = Some(entry.hash);
return Some(entry);
}

View File

@ -164,7 +164,7 @@ mod tests {
use bs58;
// golden needs to be updated if blob stuff changes....
let golden = Hash::new(
&bs58::decode("C9hBb1U2Pck3jD5gDuh9gLFT9gJu1ess7DG99qQA9TND")
&bs58::decode("JCCiVjn5NJDkRzCF2QP2QXzKfk5nba7dyMG9W7yzZHBk")
.into_vec()
.unwrap(),
);

View File

@ -174,7 +174,7 @@ mod test {
assert_eq!(blocktree.find_missing_data_indexes(slot, 4, 3, 1), empty);
assert_eq!(blocktree.find_missing_data_indexes(slot, 1, 2, 0), empty);
let mut blobs = make_tiny_test_entries(2).to_blobs();
let mut blobs = make_tiny_test_entries(2).to_single_entry_blobs();
const ONE: u64 = 1;
const OTHER: u64 = 4;
@ -215,7 +215,7 @@ mod test {
let gap = 10;
assert!(gap > 3);
let num_entries = 10;
let mut blobs = make_tiny_test_entries(num_entries).to_blobs();
let mut blobs = make_tiny_test_entries(num_entries).to_single_entry_blobs();
for (i, b) in blobs.iter_mut().enumerate() {
b.set_index(i as u64 * gap);
b.set_slot(slot);
@ -296,7 +296,8 @@ mod test {
let num_entries_per_slot = 10;
let num_slots = 2;
let mut blobs = make_tiny_test_entries(num_slots * num_entries_per_slot).to_blobs();
let mut blobs =
make_tiny_test_entries(num_slots * num_entries_per_slot).to_single_entry_blobs();
// Insert every nth entry for each slot
let nth = 3;
@ -365,7 +366,7 @@ mod test {
// Write entries
let num_entries = 10;
let shared_blobs = make_tiny_test_entries(num_entries).to_shared_blobs();
let shared_blobs = make_tiny_test_entries(num_entries).to_single_entry_shared_blobs();
index_blobs(&shared_blobs, &Keypair::new().pubkey(), 0, slot, 0);

View File

@ -5,7 +5,7 @@
use crate::packet::{Blob, SharedBlob, BLOB_DATA_SIZE};
use crate::poh::Poh;
use crate::result::Result;
use bincode::{deserialize, serialize_into, serialized_size};
use bincode::{deserialize, serialized_size};
use chrono::prelude::Utc;
use rayon::prelude::*;
use solana_budget_api::budget_transaction::BudgetTransaction;
@ -16,7 +16,6 @@ use solana_sdk::transaction::Transaction;
use solana_vote_api::vote_instruction::Vote;
use solana_vote_api::vote_transaction::VoteTransaction;
use std::borrow::Borrow;
use std::io::Cursor;
use std::mem::size_of;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, RwLock};
@ -102,14 +101,7 @@ impl Entry {
}
pub fn to_blob(&self) -> Blob {
let mut blob = Blob::default();
let pos = {
let mut out = Cursor::new(blob.data_mut());
serialize_into(&mut out, &self).expect("failed to serialize output");
out.position() as usize
};
blob.set_size(pos);
blob
Blob::from_serializable(&vec![&self])
}
/// Estimate serialized_size of Entry without creating an Entry.
@ -197,15 +189,14 @@ where
let mut num_ticks = 0;
for blob in blobs.into_iter() {
let entry: Entry = {
let new_entries: Vec<Entry> = {
let msg_size = blob.borrow().size();
deserialize(&blob.borrow().data()[..msg_size])?
};
if entry.is_tick() {
num_ticks += 1
}
entries.push(entry)
let num_new_ticks: u64 = new_entries.iter().map(|entry| entry.is_tick() as u64).sum();
num_ticks += num_new_ticks;
entries.extend(new_entries)
}
Ok((entries, num_ticks))
}
@ -216,6 +207,8 @@ pub trait EntrySlice {
fn verify(&self, start_hash: &Hash) -> bool;
fn to_shared_blobs(&self) -> Vec<SharedBlob>;
fn to_blobs(&self) -> Vec<Blob>;
fn to_single_entry_blobs(&self) -> Vec<Blob>;
fn to_single_entry_shared_blobs(&self) -> Vec<SharedBlob>;
fn votes(&self) -> Vec<(Pubkey, Vote, Hash)>;
}
@ -242,11 +235,30 @@ impl EntrySlice for [Entry] {
}
fn to_blobs(&self) -> Vec<Blob> {
self.iter().map(|entry| entry.to_blob()).collect()
split_serializable_chunks(
&self,
BLOB_DATA_SIZE as u64,
&|s| bincode::serialized_size(&s).unwrap(),
&mut |entries: &[Entry]| Blob::from_serializable(entries),
)
}
fn to_shared_blobs(&self) -> Vec<SharedBlob> {
self.iter().map(|entry| entry.to_shared_blob()).collect()
self.to_blobs()
.into_iter()
.map(|b| Arc::new(RwLock::new(b)))
.collect()
}
fn to_single_entry_shared_blobs(&self) -> Vec<SharedBlob> {
self.to_single_entry_blobs()
.into_iter()
.map(|b| Arc::new(RwLock::new(b)))
.collect()
}
fn to_single_entry_blobs(&self) -> Vec<Blob> {
self.iter().map(|entry| entry.to_blob()).collect()
}
fn votes(&self) -> Vec<(Pubkey, Vote, Hash)> {
@ -422,7 +434,7 @@ pub fn make_consecutive_blobs(
) -> Vec<SharedBlob> {
let entries = create_ticks(num_blobs_to_make, start_hash);
let blobs = entries.to_shared_blobs();
let blobs = entries.to_single_entry_shared_blobs();
let mut index = start_height;
for blob in &blobs {
let mut blob = blob.write().unwrap();
@ -596,7 +608,7 @@ mod tests {
}
#[test]
fn test_entries_to_shared_blobs() {
fn test_entries_to_blobs() {
solana_logger::setup();
let entries = make_test_entries();
@ -605,6 +617,23 @@ mod tests {
assert_eq!(reconstruct_entries_from_blobs(blob_q).unwrap().0, entries);
}
#[test]
fn test_multiple_entries_to_blobs() {
solana_logger::setup();
let num_blobs = 10;
let serialized_size =
bincode::serialized_size(&make_tiny_test_entries_from_hash(&Hash::default(), 1))
.unwrap();
let num_entries = (num_blobs * BLOB_DATA_SIZE as u64) / serialized_size;
let entries = make_tiny_test_entries_from_hash(&Hash::default(), num_entries as usize);
let blob_q = entries.to_blobs();
assert_eq!(blob_q.len() as u64, num_blobs);
assert_eq!(reconstruct_entries_from_blobs(blob_q).unwrap().0, entries);
}
#[test]
fn test_bad_blobs_attack() {
solana_logger::setup();

View File

@ -901,7 +901,7 @@ pub mod test {
}
fn generate_test_blobs(offset: usize, num_blobs: usize) -> Vec<SharedBlob> {
let blobs = make_tiny_test_entries(num_blobs).to_shared_blobs();
let blobs = make_tiny_test_entries(num_blobs).to_single_entry_shared_blobs();
index_blobs(&blobs, &Keypair::new().pubkey(), offset as u64, 0, 0);
blobs

View File

@ -363,6 +363,17 @@ impl Blob {
blob
}
pub fn from_serializable<T: Serialize + ?Sized>(data: &T) -> Self {
let mut blob = Self::default();
let pos = {
let mut out = Cursor::new(blob.data_mut());
bincode::serialize_into(&mut out, data).expect("failed to serialize output");
out.position() as usize
};
blob.set_size(pos);
blob
}
pub fn parent(&self) -> u64 {
LittleEndian::read_u64(&self.data[PARENT_RANGE])
}