2018-12-12 15:58:29 -08:00
|
|
|
//! The `db_ledger` module provides functions for parallel verification of the
|
2018-11-15 15:53:31 -08:00
|
|
|
//! Proof of History ledger as well as iterative read, append write, and random
|
|
|
|
//! access read to a persistent file-based ledger.
|
|
|
|
|
2018-12-07 19:16:27 -08:00
|
|
|
use crate::entry::Entry;
|
|
|
|
use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
|
|
|
|
use crate::result::{Error, Result};
|
2018-11-15 15:53:31 -08:00
|
|
|
use bincode::{deserialize, serialize};
|
2018-11-22 01:35:19 -08:00
|
|
|
use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
|
2018-12-19 16:11:47 -08:00
|
|
|
use rocksdb::{ColumnFamily, ColumnFamilyDescriptor, DBRawIterator, Options, WriteBatch, DB};
|
2018-11-15 15:53:31 -08:00
|
|
|
use serde::de::DeserializeOwned;
|
|
|
|
use serde::Serialize;
|
2018-12-11 09:14:23 -08:00
|
|
|
use solana_sdk::signature::{Keypair, KeypairUtil};
|
2018-11-24 19:32:33 -08:00
|
|
|
use std::borrow::Borrow;
|
2018-12-19 16:11:47 -08:00
|
|
|
use std::cmp::max;
|
2018-11-15 15:53:31 -08:00
|
|
|
use std::io;
|
2018-12-11 09:14:23 -08:00
|
|
|
use std::path::Path;
|
2018-11-15 15:53:31 -08:00
|
|
|
|
2018-12-11 09:14:23 -08:00
|
|
|
pub const DB_LEDGER_DIRECTORY: &str = "rocksdb";
|
2018-12-19 16:11:47 -08:00
|
|
|
// A good value for this is the number of cores on the machine
|
|
|
|
pub const TOTAL_THREADS: i32 = 8;
|
2018-11-15 15:53:31 -08:00
|
|
|
|
2018-12-20 09:57:29 -08:00
|
|
|
#[derive(Debug)]
|
2018-11-15 15:53:31 -08:00
|
|
|
pub enum DbLedgerError {
|
|
|
|
BlobForIndexExists,
|
|
|
|
InvalidBlobData,
|
2018-12-20 09:57:29 -08:00
|
|
|
RocksDb(rocksdb::Error),
|
|
|
|
}
|
|
|
|
|
|
|
|
impl std::convert::From<rocksdb::Error> for Error {
|
|
|
|
fn from(e: rocksdb::Error) -> Error {
|
|
|
|
Error::DbLedgerError(DbLedgerError::RocksDb(e))
|
|
|
|
}
|
2018-11-15 15:53:31 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
pub trait LedgerColumnFamily {
|
|
|
|
type ValueType: DeserializeOwned + Serialize;
|
|
|
|
|
|
|
|
fn get(&self, db: &DB, key: &[u8]) -> Result<Option<Self::ValueType>> {
|
|
|
|
let data_bytes = db.get_cf(self.handle(db), key)?;
|
|
|
|
|
|
|
|
if let Some(raw) = data_bytes {
|
|
|
|
let result: Self::ValueType = deserialize(&raw)?;
|
|
|
|
Ok(Some(result))
|
|
|
|
} else {
|
|
|
|
Ok(None)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn get_bytes(&self, db: &DB, key: &[u8]) -> Result<Option<Vec<u8>>> {
|
|
|
|
let data_bytes = db.get_cf(self.handle(db), key)?;
|
|
|
|
Ok(data_bytes.map(|x| x.to_vec()))
|
|
|
|
}
|
|
|
|
|
|
|
|
fn put_bytes(&self, db: &DB, key: &[u8], serialized_value: &[u8]) -> Result<()> {
|
|
|
|
db.put_cf(self.handle(db), &key, &serialized_value)?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn put(&self, db: &DB, key: &[u8], value: &Self::ValueType) -> Result<()> {
|
|
|
|
let serialized = serialize(value)?;
|
|
|
|
db.put_cf(self.handle(db), &key, &serialized)?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn delete(&self, db: &DB, key: &[u8]) -> Result<()> {
|
|
|
|
db.delete_cf(self.handle(db), &key)?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn handle(&self, db: &DB) -> ColumnFamily;
|
|
|
|
}
|
|
|
|
|
|
|
|
pub trait LedgerColumnFamilyRaw {
|
|
|
|
fn get(&self, db: &DB, key: &[u8]) -> Result<Option<Vec<u8>>> {
|
|
|
|
let data_bytes = db.get_cf(self.handle(db), key)?;
|
|
|
|
Ok(data_bytes.map(|x| x.to_vec()))
|
|
|
|
}
|
|
|
|
|
|
|
|
fn put(&self, db: &DB, key: &[u8], serialized_value: &[u8]) -> Result<()> {
|
|
|
|
db.put_cf(self.handle(db), &key, &serialized_value)?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn delete(&self, db: &DB, key: &[u8]) -> Result<()> {
|
|
|
|
db.delete_cf(self.handle(db), &key)?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn handle(&self, db: &DB) -> ColumnFamily;
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
|
|
|
|
// The Meta column family
|
|
|
|
pub struct SlotMeta {
|
|
|
|
// The total number of consecutive blob starting from index 0
|
|
|
|
// we have received for this slot.
|
|
|
|
pub consumed: u64,
|
|
|
|
// The entry height of the highest blob received for this slot.
|
|
|
|
pub received: u64,
|
2018-12-12 15:58:29 -08:00
|
|
|
// The slot the blob with index == "consumed" is in
|
|
|
|
pub consumed_slot: u64,
|
|
|
|
// The slot the blob with index == "received" is in
|
|
|
|
pub received_slot: u64,
|
2018-11-15 15:53:31 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
impl SlotMeta {
|
|
|
|
fn new() -> Self {
|
|
|
|
SlotMeta {
|
|
|
|
consumed: 0,
|
|
|
|
received: 0,
|
2018-12-12 15:58:29 -08:00
|
|
|
consumed_slot: 0,
|
|
|
|
received_slot: 0,
|
2018-11-15 15:53:31 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Default)]
|
|
|
|
pub struct MetaCf {}
|
|
|
|
|
|
|
|
impl MetaCf {
|
|
|
|
pub fn key(slot_height: u64) -> Vec<u8> {
|
|
|
|
let mut key = vec![0u8; 8];
|
2018-11-22 01:35:19 -08:00
|
|
|
BigEndian::write_u64(&mut key[0..8], slot_height);
|
2018-11-15 15:53:31 -08:00
|
|
|
key
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl LedgerColumnFamily for MetaCf {
|
|
|
|
type ValueType = SlotMeta;
|
|
|
|
|
|
|
|
fn handle(&self, db: &DB) -> ColumnFamily {
|
|
|
|
db.cf_handle(META_CF).unwrap()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// The data column family
|
|
|
|
#[derive(Default)]
|
|
|
|
pub struct DataCf {}
|
|
|
|
|
|
|
|
impl DataCf {
|
2018-11-19 23:20:18 -08:00
|
|
|
pub fn get_by_slot_index(
|
|
|
|
&self,
|
|
|
|
db: &DB,
|
|
|
|
slot_height: u64,
|
|
|
|
index: u64,
|
|
|
|
) -> Result<Option<Vec<u8>>> {
|
|
|
|
let key = Self::key(slot_height, index);
|
|
|
|
self.get(db, &key)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn put_by_slot_index(
|
|
|
|
&self,
|
|
|
|
db: &DB,
|
|
|
|
slot_height: u64,
|
|
|
|
index: u64,
|
|
|
|
serialized_value: &[u8],
|
|
|
|
) -> Result<()> {
|
|
|
|
let key = Self::key(slot_height, index);
|
|
|
|
self.put(db, &key, serialized_value)
|
|
|
|
}
|
|
|
|
|
2018-11-15 15:53:31 -08:00
|
|
|
pub fn key(slot_height: u64, index: u64) -> Vec<u8> {
|
|
|
|
let mut key = vec![0u8; 16];
|
2018-11-22 01:35:19 -08:00
|
|
|
BigEndian::write_u64(&mut key[0..8], slot_height);
|
|
|
|
BigEndian::write_u64(&mut key[8..16], index);
|
2018-11-15 15:53:31 -08:00
|
|
|
key
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn slot_height_from_key(key: &[u8]) -> Result<u64> {
|
|
|
|
let mut rdr = io::Cursor::new(&key[0..8]);
|
2018-11-22 01:35:19 -08:00
|
|
|
let height = rdr.read_u64::<BigEndian>()?;
|
2018-11-15 15:53:31 -08:00
|
|
|
Ok(height)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn index_from_key(key: &[u8]) -> Result<u64> {
|
|
|
|
let mut rdr = io::Cursor::new(&key[8..16]);
|
2018-11-22 01:35:19 -08:00
|
|
|
let index = rdr.read_u64::<BigEndian>()?;
|
2018-11-15 15:53:31 -08:00
|
|
|
Ok(index)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl LedgerColumnFamilyRaw for DataCf {
|
|
|
|
fn handle(&self, db: &DB) -> ColumnFamily {
|
|
|
|
db.cf_handle(DATA_CF).unwrap()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// The erasure column family
|
|
|
|
#[derive(Default)]
|
|
|
|
pub struct ErasureCf {}
|
|
|
|
|
|
|
|
impl ErasureCf {
|
2018-12-05 12:47:19 -08:00
|
|
|
pub fn delete_by_slot_index(&self, db: &DB, slot_height: u64, index: u64) -> Result<()> {
|
|
|
|
let key = Self::key(slot_height, index);
|
|
|
|
self.delete(db, &key)
|
|
|
|
}
|
|
|
|
|
2018-11-19 23:20:18 -08:00
|
|
|
pub fn get_by_slot_index(
|
|
|
|
&self,
|
|
|
|
db: &DB,
|
|
|
|
slot_height: u64,
|
|
|
|
index: u64,
|
|
|
|
) -> Result<Option<Vec<u8>>> {
|
|
|
|
let key = Self::key(slot_height, index);
|
|
|
|
self.get(db, &key)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn put_by_slot_index(
|
|
|
|
&self,
|
|
|
|
db: &DB,
|
|
|
|
slot_height: u64,
|
|
|
|
index: u64,
|
|
|
|
serialized_value: &[u8],
|
|
|
|
) -> Result<()> {
|
|
|
|
let key = Self::key(slot_height, index);
|
|
|
|
self.put(db, &key, serialized_value)
|
|
|
|
}
|
|
|
|
|
2018-11-15 15:53:31 -08:00
|
|
|
pub fn key(slot_height: u64, index: u64) -> Vec<u8> {
|
|
|
|
DataCf::key(slot_height, index)
|
|
|
|
}
|
2018-11-19 23:20:18 -08:00
|
|
|
|
2018-12-12 15:58:29 -08:00
|
|
|
pub fn slot_height_from_key(key: &[u8]) -> Result<u64> {
|
|
|
|
DataCf::slot_height_from_key(key)
|
|
|
|
}
|
|
|
|
|
2018-11-19 23:20:18 -08:00
|
|
|
pub fn index_from_key(key: &[u8]) -> Result<u64> {
|
|
|
|
DataCf::index_from_key(key)
|
|
|
|
}
|
2018-11-15 15:53:31 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
impl LedgerColumnFamilyRaw for ErasureCf {
|
|
|
|
fn handle(&self, db: &DB) -> ColumnFamily {
|
|
|
|
db.cf_handle(ERASURE_CF).unwrap()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// ledger window
|
|
|
|
pub struct DbLedger {
|
|
|
|
// Underlying database is automatically closed in the Drop implementation of DB
|
|
|
|
pub db: DB,
|
|
|
|
pub meta_cf: MetaCf,
|
|
|
|
pub data_cf: DataCf,
|
|
|
|
pub erasure_cf: ErasureCf,
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO: Once we support a window that knows about different leader
|
|
|
|
// slots, change functions where this is used to take slot height
|
|
|
|
// as a variable argument
|
|
|
|
pub const DEFAULT_SLOT_HEIGHT: u64 = 0;
|
|
|
|
// Column family for metadata about a leader slot
|
|
|
|
pub const META_CF: &str = "meta";
|
|
|
|
// Column family for the data in a leader slot
|
|
|
|
pub const DATA_CF: &str = "data";
|
|
|
|
// Column family for erasure data
|
|
|
|
pub const ERASURE_CF: &str = "erasure";
|
|
|
|
|
|
|
|
impl DbLedger {
|
|
|
|
// Opens a Ledger in directory, provides "infinite" window of blobs
|
|
|
|
pub fn open(ledger_path: &str) -> Result<Self> {
|
2018-12-11 09:14:23 -08:00
|
|
|
let ledger_path = Path::new(ledger_path).join(DB_LEDGER_DIRECTORY);
|
2018-11-24 19:32:33 -08:00
|
|
|
|
2018-11-15 15:53:31 -08:00
|
|
|
// Use default database options
|
2018-12-19 16:11:47 -08:00
|
|
|
let db_options = Self::get_db_options();
|
2018-11-15 15:53:31 -08:00
|
|
|
|
|
|
|
// Column family names
|
2018-12-19 16:11:47 -08:00
|
|
|
let meta_cf_descriptor = ColumnFamilyDescriptor::new(META_CF, Self::get_cf_options());
|
|
|
|
let data_cf_descriptor = ColumnFamilyDescriptor::new(DATA_CF, Self::get_cf_options());
|
|
|
|
let erasure_cf_descriptor = ColumnFamilyDescriptor::new(ERASURE_CF, Self::get_cf_options());
|
|
|
|
let cfs = vec![
|
|
|
|
meta_cf_descriptor,
|
|
|
|
data_cf_descriptor,
|
|
|
|
erasure_cf_descriptor,
|
|
|
|
];
|
2018-11-15 15:53:31 -08:00
|
|
|
|
|
|
|
// Open the database
|
2018-12-19 16:11:47 -08:00
|
|
|
let db = DB::open_cf_descriptors(&db_options, ledger_path, cfs)?;
|
2018-11-15 15:53:31 -08:00
|
|
|
|
|
|
|
// Create the metadata column family
|
|
|
|
let meta_cf = MetaCf::default();
|
|
|
|
|
|
|
|
// Create the data column family
|
|
|
|
let data_cf = DataCf::default();
|
|
|
|
|
|
|
|
// Create the erasure column family
|
|
|
|
let erasure_cf = ErasureCf::default();
|
|
|
|
|
|
|
|
Ok(DbLedger {
|
|
|
|
db,
|
|
|
|
meta_cf,
|
|
|
|
data_cf,
|
|
|
|
erasure_cf,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2018-11-24 19:32:33 -08:00
|
|
|
pub fn destroy(ledger_path: &str) -> Result<()> {
|
2018-12-11 09:14:23 -08:00
|
|
|
let ledger_path = Path::new(ledger_path).join(DB_LEDGER_DIRECTORY);
|
2018-11-24 19:32:33 -08:00
|
|
|
DB::destroy(&Options::default(), &ledger_path)?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2018-12-18 15:18:57 -08:00
|
|
|
pub fn write_shared_blobs<I>(&self, shared_blobs: I) -> Result<Vec<Entry>>
|
2018-11-24 19:32:33 -08:00
|
|
|
where
|
|
|
|
I: IntoIterator,
|
|
|
|
I::Item: Borrow<SharedBlob>,
|
|
|
|
{
|
2018-12-19 16:11:47 -08:00
|
|
|
let c_blobs: Vec<_> = shared_blobs
|
|
|
|
.into_iter()
|
|
|
|
.map(move |s| s.borrow().clone())
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
let r_blobs: Vec<_> = c_blobs.iter().map(move |b| b.read().unwrap()).collect();
|
|
|
|
|
|
|
|
let blobs = r_blobs.iter().map(|s| &**s);
|
|
|
|
|
|
|
|
let new_entries = self.insert_data_blobs(blobs)?;
|
|
|
|
Ok(new_entries)
|
2018-11-19 23:20:18 -08:00
|
|
|
}
|
|
|
|
|
2018-12-18 15:18:57 -08:00
|
|
|
pub fn write_blobs<'a, I>(&self, blobs: I) -> Result<Vec<Entry>>
|
2018-11-15 15:53:31 -08:00
|
|
|
where
|
|
|
|
I: IntoIterator<Item = &'a &'a Blob>,
|
|
|
|
{
|
2018-12-19 16:11:47 -08:00
|
|
|
let blobs = blobs.into_iter().cloned();
|
|
|
|
let new_entries = self.insert_data_blobs(blobs)?;
|
|
|
|
Ok(new_entries)
|
2018-11-15 15:53:31 -08:00
|
|
|
}
|
|
|
|
|
2018-12-18 15:18:57 -08:00
|
|
|
pub fn write_entries<I>(&self, slot: u64, entries: I) -> Result<Vec<Entry>>
|
2018-11-24 19:32:33 -08:00
|
|
|
where
|
|
|
|
I: IntoIterator,
|
|
|
|
I::Item: Borrow<Entry>,
|
|
|
|
{
|
2018-12-12 20:42:12 -08:00
|
|
|
let shared_blobs = entries.into_iter().enumerate().map(|(idx, entry)| {
|
|
|
|
let b = entry.borrow().to_blob();
|
2018-12-14 17:05:41 -08:00
|
|
|
{
|
|
|
|
let mut w_b = b.write().unwrap();
|
|
|
|
w_b.set_index(idx as u64).unwrap();
|
|
|
|
w_b.set_slot(slot).unwrap();
|
|
|
|
}
|
2018-12-12 20:42:12 -08:00
|
|
|
b
|
|
|
|
});
|
|
|
|
|
2018-12-14 17:05:41 -08:00
|
|
|
self.write_shared_blobs(shared_blobs)
|
2018-11-15 15:53:31 -08:00
|
|
|
}
|
|
|
|
|
2018-12-19 16:11:47 -08:00
|
|
|
pub fn insert_data_blobs<I>(&self, new_blobs: I) -> Result<Vec<Entry>>
|
|
|
|
where
|
|
|
|
I: IntoIterator,
|
|
|
|
I::Item: Borrow<Blob>,
|
|
|
|
{
|
|
|
|
let mut new_blobs: Vec<_> = new_blobs.into_iter().collect();
|
|
|
|
|
|
|
|
if new_blobs.is_empty() {
|
|
|
|
return Ok(vec![]);
|
|
|
|
}
|
|
|
|
|
|
|
|
new_blobs.sort_unstable_by(|b1, b2| {
|
|
|
|
b1.borrow()
|
|
|
|
.index()
|
|
|
|
.unwrap()
|
|
|
|
.cmp(&b2.borrow().index().unwrap())
|
|
|
|
});
|
|
|
|
|
2018-12-12 15:58:29 -08:00
|
|
|
let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT);
|
2018-11-15 15:53:31 -08:00
|
|
|
|
|
|
|
let mut should_write_meta = false;
|
|
|
|
|
|
|
|
let mut meta = {
|
|
|
|
if let Some(meta) = self.db.get_cf(self.meta_cf.handle(&self.db), &meta_key)? {
|
|
|
|
deserialize(&meta)?
|
|
|
|
} else {
|
|
|
|
should_write_meta = true;
|
|
|
|
SlotMeta::new()
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
// TODO: Handle if leader sends different blob for same index when the index > consumed
|
|
|
|
// The old window implementation would just replace that index.
|
2018-12-19 16:11:47 -08:00
|
|
|
let lowest_index = new_blobs[0].borrow().index()?;
|
|
|
|
let lowest_slot = new_blobs[0].borrow().slot()?;
|
|
|
|
let highest_index = new_blobs.last().unwrap().borrow().index()?;
|
|
|
|
let highest_slot = new_blobs.last().unwrap().borrow().slot()?;
|
|
|
|
if lowest_index < meta.consumed {
|
2018-11-15 15:53:31 -08:00
|
|
|
return Err(Error::DbLedgerError(DbLedgerError::BlobForIndexExists));
|
|
|
|
}
|
|
|
|
|
|
|
|
// Index is zero-indexed, while the "received" height starts from 1,
|
|
|
|
// so received = index + 1 for the same blob.
|
2018-12-19 16:11:47 -08:00
|
|
|
if highest_index >= meta.received {
|
|
|
|
meta.received = highest_index + 1;
|
|
|
|
meta.received_slot = highest_slot;
|
2018-11-15 15:53:31 -08:00
|
|
|
should_write_meta = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut consumed_queue = vec![];
|
|
|
|
|
2018-12-19 16:11:47 -08:00
|
|
|
if meta.consumed == lowest_index {
|
2018-11-15 15:53:31 -08:00
|
|
|
// Find the next consecutive block of blobs.
|
|
|
|
// TODO: account for consecutive blocks that
|
|
|
|
// span multiple slots
|
2018-12-19 16:11:47 -08:00
|
|
|
should_write_meta = true;
|
|
|
|
let mut index_into_blob = 0;
|
|
|
|
let mut current_index = lowest_index;
|
|
|
|
let mut current_slot = lowest_slot;
|
|
|
|
'outer: loop {
|
|
|
|
let entry: Entry = {
|
|
|
|
let (next_new_blob, new_blob_index) = {
|
|
|
|
if index_into_blob < new_blobs.len() {
|
|
|
|
let blob = new_blobs[index_into_blob].borrow();
|
|
|
|
(Some(blob), Some(blob.index()?))
|
2018-12-12 15:58:29 -08:00
|
|
|
} else {
|
2018-12-19 16:11:47 -08:00
|
|
|
(None, None)
|
2018-12-12 15:58:29 -08:00
|
|
|
}
|
2018-12-19 16:11:47 -08:00
|
|
|
};
|
|
|
|
|
|
|
|
if new_blob_index == Some(current_index) {
|
|
|
|
index_into_blob += 1;
|
|
|
|
let next_new_blob = next_new_blob.unwrap();
|
|
|
|
current_slot = next_new_blob.slot()?;
|
|
|
|
let serialized_entry_data = &next_new_blob.data
|
|
|
|
[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + next_new_blob.size()?];
|
|
|
|
// Verify entries can actually be reconstructed
|
|
|
|
deserialize(serialized_entry_data).expect(
|
|
|
|
"Blob made it past validation, so must be deserializable at this point",
|
|
|
|
)
|
2018-12-12 15:58:29 -08:00
|
|
|
} else {
|
2018-12-19 16:11:47 -08:00
|
|
|
let key = DataCf::key(current_slot, current_index);
|
|
|
|
let blob_data = {
|
|
|
|
if let Some(blob_data) = self.data_cf.get(&self.db, &key)? {
|
|
|
|
blob_data
|
|
|
|
} else if meta.consumed < meta.received {
|
|
|
|
let key = DataCf::key(current_slot + 1, current_index);
|
|
|
|
if let Some(blob_data) = self.data_cf.get(&self.db, &key)? {
|
|
|
|
current_slot += 1;
|
|
|
|
meta.consumed_slot = current_slot;
|
|
|
|
blob_data
|
|
|
|
} else {
|
|
|
|
break 'outer;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
break 'outer;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
deserialize(&blob_data[BLOB_HEADER_SIZE..])
|
|
|
|
.expect("Blobs in database must be deserializable")
|
2018-12-12 15:58:29 -08:00
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
consumed_queue.push(entry);
|
2018-12-19 16:11:47 -08:00
|
|
|
current_index += 1;
|
2018-12-12 15:58:29 -08:00
|
|
|
meta.consumed += 1;
|
2018-11-15 15:53:31 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Commit Step: Atomic write both the metadata and the data
|
|
|
|
let mut batch = WriteBatch::default();
|
|
|
|
if should_write_meta {
|
|
|
|
batch.put_cf(self.meta_cf.handle(&self.db), &meta_key, &serialize(&meta)?)?;
|
|
|
|
}
|
|
|
|
|
2018-12-19 16:11:47 -08:00
|
|
|
for blob in new_blobs {
|
|
|
|
let blob = blob.borrow();
|
|
|
|
let key = DataCf::key(blob.slot()?, blob.index()?);
|
|
|
|
let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()?];
|
|
|
|
batch.put_cf(self.data_cf.handle(&self.db), &key, serialized_blob_datas)?;
|
|
|
|
}
|
|
|
|
|
2018-11-15 15:53:31 -08:00
|
|
|
self.db.write(batch)?;
|
|
|
|
Ok(consumed_queue)
|
|
|
|
}
|
|
|
|
|
2018-12-19 16:11:47 -08:00
|
|
|
// Writes a list of sorted, consecutive broadcast blobs to the db_ledger
|
|
|
|
pub fn write_consecutive_blobs(&self, blobs: &[SharedBlob]) -> Result<()> {
|
|
|
|
assert!(!blobs.is_empty());
|
|
|
|
|
|
|
|
let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT);
|
|
|
|
|
|
|
|
let mut meta = {
|
|
|
|
if let Some(meta) = self.meta_cf.get(&self.db, &meta_key)? {
|
|
|
|
let first = blobs[0].read().unwrap();
|
|
|
|
assert_eq!(meta.consumed, first.index()?);
|
|
|
|
meta
|
|
|
|
} else {
|
|
|
|
SlotMeta::new()
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
{
|
|
|
|
let last = blobs.last().unwrap().read().unwrap();
|
|
|
|
meta.consumed = last.index()? + 1;
|
|
|
|
meta.consumed_slot = last.slot()?;
|
|
|
|
meta.received = max(meta.received, last.index()? + 1);
|
|
|
|
meta.received_slot = max(meta.received_slot, last.index()?);
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut batch = WriteBatch::default();
|
|
|
|
batch.put_cf(self.meta_cf.handle(&self.db), &meta_key, &serialize(&meta)?)?;
|
|
|
|
for blob in blobs {
|
|
|
|
let blob = blob.read().unwrap();
|
|
|
|
let key = DataCf::key(blob.slot()?, blob.index()?);
|
|
|
|
let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()?];
|
|
|
|
batch.put_cf(self.data_cf.handle(&self.db), &key, serialized_blob_datas)?;
|
|
|
|
}
|
|
|
|
self.db.write(batch)?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2018-11-15 15:53:31 -08:00
|
|
|
// Fill 'buf' with num_blobs or most number of consecutive
|
|
|
|
// whole blobs that fit into buf.len()
|
|
|
|
//
|
|
|
|
// Return tuple of (number of blob read, total size of blobs read)
|
|
|
|
pub fn get_blob_bytes(
|
2018-12-18 15:18:57 -08:00
|
|
|
&self,
|
2018-11-15 15:53:31 -08:00
|
|
|
start_index: u64,
|
|
|
|
num_blobs: u64,
|
|
|
|
buf: &mut [u8],
|
2018-12-12 15:58:29 -08:00
|
|
|
slot_height: u64,
|
2018-11-15 15:53:31 -08:00
|
|
|
) -> Result<(u64, u64)> {
|
2018-12-12 15:58:29 -08:00
|
|
|
let start_key = DataCf::key(slot_height, start_index);
|
2018-11-15 15:53:31 -08:00
|
|
|
let mut db_iterator = self.db.raw_iterator_cf(self.data_cf.handle(&self.db))?;
|
|
|
|
db_iterator.seek(&start_key);
|
|
|
|
let mut total_blobs = 0;
|
|
|
|
let mut total_current_size = 0;
|
|
|
|
for expected_index in start_index..start_index + num_blobs {
|
|
|
|
if !db_iterator.valid() {
|
|
|
|
if expected_index == start_index {
|
|
|
|
return Err(Error::IO(io::Error::new(
|
|
|
|
io::ErrorKind::NotFound,
|
|
|
|
"Blob at start_index not found",
|
|
|
|
)));
|
|
|
|
} else {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Check key is the next sequential key based on
|
|
|
|
// blob index
|
|
|
|
let key = &db_iterator.key().expect("Expected valid key");
|
|
|
|
let index = DataCf::index_from_key(key)?;
|
|
|
|
if index != expected_index {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get the blob data
|
|
|
|
let value = &db_iterator.value();
|
|
|
|
|
|
|
|
if value.is_none() {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
let value = value.as_ref().unwrap();
|
|
|
|
let blob_data_len = value.len();
|
|
|
|
|
|
|
|
if total_current_size + blob_data_len > buf.len() {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
buf[total_current_size..total_current_size + value.len()].copy_from_slice(value);
|
|
|
|
total_current_size += blob_data_len;
|
|
|
|
total_blobs += 1;
|
|
|
|
|
|
|
|
// TODO: Change this logic to support looking for data
|
|
|
|
// that spans multiple leader slots, once we support
|
|
|
|
// a window that knows about different leader slots
|
|
|
|
db_iterator.next();
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok((total_blobs, total_current_size as u64))
|
|
|
|
}
|
2018-12-11 09:14:23 -08:00
|
|
|
|
|
|
|
/// Return an iterator for all the entries in the given file.
|
|
|
|
pub fn read_ledger(&self) -> Result<impl Iterator<Item = Entry>> {
|
|
|
|
let mut db_iterator = self.db.raw_iterator_cf(self.data_cf.handle(&self.db))?;
|
|
|
|
|
|
|
|
db_iterator.seek_to_first();
|
|
|
|
Ok(EntryIterator { db_iterator })
|
|
|
|
}
|
2018-12-19 16:11:47 -08:00
|
|
|
|
|
|
|
fn get_cf_options() -> Options {
|
|
|
|
let mut options = Options::default();
|
|
|
|
options.set_max_write_buffer_number(32);
|
|
|
|
options.set_write_buffer_size(512 * 1024 * 1024);
|
|
|
|
options
|
|
|
|
}
|
|
|
|
|
|
|
|
fn get_db_options() -> Options {
|
|
|
|
let mut options = Options::default();
|
|
|
|
options.create_if_missing(true);
|
|
|
|
options.create_missing_column_families(true);
|
|
|
|
options.increase_parallelism(TOTAL_THREADS);
|
|
|
|
options.set_max_background_flushes(4);
|
|
|
|
options.set_max_background_compactions(4);
|
|
|
|
options.set_max_write_buffer_number(32);
|
|
|
|
options.set_write_buffer_size(512 * 1024 * 1024);
|
|
|
|
options
|
|
|
|
}
|
2018-12-11 09:14:23 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
struct EntryIterator {
|
|
|
|
db_iterator: DBRawIterator,
|
|
|
|
// https://github.com/rust-rocksdb/rust-rocksdb/issues/234
|
|
|
|
// rocksdb issue: the _db_ledger member must be lower in the struct to prevent a crash
|
|
|
|
// when the db_iterator member above is dropped.
|
|
|
|
// _db_ledger is unused, but dropping _db_ledger results in a broken db_iterator
|
|
|
|
// you have to hold the database open in order to iterate over it, and in order
|
|
|
|
// for db_iterator to be able to run Drop
|
|
|
|
// _db_ledger: DbLedger,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Iterator for EntryIterator {
|
|
|
|
type Item = Entry;
|
|
|
|
|
|
|
|
fn next(&mut self) -> Option<Entry> {
|
|
|
|
if self.db_iterator.valid() {
|
|
|
|
if let Some(value) = self.db_iterator.value() {
|
|
|
|
self.db_iterator.next();
|
|
|
|
|
|
|
|
match deserialize(&value[BLOB_HEADER_SIZE..]) {
|
|
|
|
Ok(entry) => Some(entry),
|
|
|
|
_ => None,
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
None
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
None
|
|
|
|
}
|
|
|
|
}
|
2018-11-15 15:53:31 -08:00
|
|
|
}
|
|
|
|
|
2018-12-12 15:58:29 -08:00
|
|
|
pub fn write_entries_to_ledger<I>(ledger_paths: &[&str], entries: I, slot_height: u64)
|
2018-11-24 19:32:33 -08:00
|
|
|
where
|
|
|
|
I: IntoIterator,
|
|
|
|
I::Item: Borrow<Entry>,
|
|
|
|
{
|
|
|
|
let mut entries = entries.into_iter();
|
2018-11-19 23:20:18 -08:00
|
|
|
for ledger_path in ledger_paths {
|
2018-12-18 15:18:57 -08:00
|
|
|
let db_ledger =
|
2018-11-19 23:20:18 -08:00
|
|
|
DbLedger::open(ledger_path).expect("Expected to be able to open database ledger");
|
|
|
|
db_ledger
|
2018-12-12 15:58:29 -08:00
|
|
|
.write_entries(slot_height, entries.by_ref())
|
2018-11-19 23:20:18 -08:00
|
|
|
.expect("Expected successful write of genesis entries");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-12-12 20:42:12 -08:00
|
|
|
pub fn genesis<'a, I>(ledger_path: &str, keypair: &Keypair, entries: I) -> Result<()>
|
2018-12-11 09:14:23 -08:00
|
|
|
where
|
|
|
|
I: IntoIterator<Item = &'a Entry>,
|
|
|
|
{
|
2018-12-18 15:18:57 -08:00
|
|
|
let db_ledger = DbLedger::open(ledger_path)?;
|
2018-12-11 09:14:23 -08:00
|
|
|
|
|
|
|
// TODO sign these blobs with keypair
|
2018-12-12 20:42:12 -08:00
|
|
|
let blobs = entries.into_iter().enumerate().map(|(idx, entry)| {
|
|
|
|
let b = entry.borrow().to_blob();
|
|
|
|
b.write().unwrap().set_index(idx as u64).unwrap();
|
|
|
|
b.write().unwrap().set_id(&keypair.pubkey()).unwrap();
|
2018-12-14 17:05:41 -08:00
|
|
|
b.write().unwrap().set_slot(DEFAULT_SLOT_HEIGHT).unwrap();
|
2018-12-12 20:42:12 -08:00
|
|
|
b
|
|
|
|
});
|
2018-12-11 09:14:23 -08:00
|
|
|
|
2018-12-14 17:05:41 -08:00
|
|
|
db_ledger.write_shared_blobs(blobs)?;
|
2018-12-11 09:14:23 -08:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2018-11-15 15:53:31 -08:00
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
|
|
|
use super::*;
|
2018-12-07 19:16:27 -08:00
|
|
|
use crate::ledger::{get_tmp_ledger_path, make_tiny_test_entries, Block};
|
2018-12-12 20:42:12 -08:00
|
|
|
use crate::packet::index_blobs;
|
2018-11-15 15:53:31 -08:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_put_get_simple() {
|
|
|
|
let ledger_path = get_tmp_ledger_path("test_put_get_simple");
|
|
|
|
let ledger = DbLedger::open(&ledger_path).unwrap();
|
|
|
|
|
|
|
|
// Test meta column family
|
|
|
|
let meta = SlotMeta::new();
|
2018-12-12 15:58:29 -08:00
|
|
|
let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT);
|
2018-11-15 15:53:31 -08:00
|
|
|
ledger.meta_cf.put(&ledger.db, &meta_key, &meta).unwrap();
|
|
|
|
let result = ledger
|
|
|
|
.meta_cf
|
|
|
|
.get(&ledger.db, &meta_key)
|
|
|
|
.unwrap()
|
|
|
|
.expect("Expected meta object to exist");
|
|
|
|
|
|
|
|
assert_eq!(result, meta);
|
|
|
|
|
|
|
|
// Test erasure column family
|
|
|
|
let erasure = vec![1u8; 16];
|
|
|
|
let erasure_key = ErasureCf::key(DEFAULT_SLOT_HEIGHT, 0);
|
|
|
|
ledger
|
|
|
|
.erasure_cf
|
|
|
|
.put(&ledger.db, &erasure_key, &erasure)
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
let result = ledger
|
|
|
|
.erasure_cf
|
|
|
|
.get(&ledger.db, &erasure_key)
|
|
|
|
.unwrap()
|
|
|
|
.expect("Expected erasure object to exist");
|
|
|
|
|
|
|
|
assert_eq!(result, erasure);
|
|
|
|
|
|
|
|
// Test data column family
|
|
|
|
let data = vec![2u8; 16];
|
|
|
|
let data_key = DataCf::key(DEFAULT_SLOT_HEIGHT, 0);
|
|
|
|
ledger.data_cf.put(&ledger.db, &data_key, &data).unwrap();
|
|
|
|
|
|
|
|
let result = ledger
|
|
|
|
.data_cf
|
|
|
|
.get(&ledger.db, &data_key)
|
|
|
|
.unwrap()
|
|
|
|
.expect("Expected data object to exist");
|
|
|
|
|
|
|
|
assert_eq!(result, data);
|
|
|
|
|
|
|
|
// Destroying database without closing it first is undefined behavior
|
|
|
|
drop(ledger);
|
2018-11-24 19:32:33 -08:00
|
|
|
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
|
2018-11-15 15:53:31 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_get_blobs_bytes() {
|
|
|
|
let shared_blobs = make_tiny_test_entries(10).to_blobs();
|
2018-12-14 17:05:41 -08:00
|
|
|
let slot = DEFAULT_SLOT_HEIGHT;
|
2018-12-12 20:42:12 -08:00
|
|
|
index_blobs(
|
2018-12-14 17:05:41 -08:00
|
|
|
shared_blobs.iter().zip(vec![slot; 10].into_iter()),
|
2018-12-12 20:42:12 -08:00
|
|
|
&Keypair::new().pubkey(),
|
|
|
|
0,
|
|
|
|
);
|
|
|
|
|
2018-11-15 15:53:31 -08:00
|
|
|
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
|
|
|
|
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
|
|
|
|
|
|
|
|
let ledger_path = get_tmp_ledger_path("test_get_blobs_bytes");
|
2018-12-18 15:18:57 -08:00
|
|
|
let ledger = DbLedger::open(&ledger_path).unwrap();
|
2018-12-14 17:05:41 -08:00
|
|
|
ledger.write_blobs(&blobs).unwrap();
|
2018-11-15 15:53:31 -08:00
|
|
|
|
|
|
|
let mut buf = [0; 1024];
|
2018-12-12 15:58:29 -08:00
|
|
|
let (num_blobs, bytes) = ledger.get_blob_bytes(0, 1, &mut buf, slot).unwrap();
|
2018-11-15 15:53:31 -08:00
|
|
|
let bytes = bytes as usize;
|
|
|
|
assert_eq!(num_blobs, 1);
|
|
|
|
{
|
|
|
|
let blob_data = &buf[..bytes];
|
|
|
|
assert_eq!(blob_data, &blobs[0].data[..bytes]);
|
|
|
|
}
|
|
|
|
|
2018-12-12 15:58:29 -08:00
|
|
|
let (num_blobs, bytes2) = ledger.get_blob_bytes(0, 2, &mut buf, slot).unwrap();
|
2018-11-15 15:53:31 -08:00
|
|
|
let bytes2 = bytes2 as usize;
|
|
|
|
assert_eq!(num_blobs, 2);
|
|
|
|
assert!(bytes2 > bytes);
|
|
|
|
{
|
|
|
|
let blob_data_1 = &buf[..bytes];
|
|
|
|
assert_eq!(blob_data_1, &blobs[0].data[..bytes]);
|
|
|
|
|
|
|
|
let blob_data_2 = &buf[bytes..bytes2];
|
|
|
|
assert_eq!(blob_data_2, &blobs[1].data[..bytes2 - bytes]);
|
|
|
|
}
|
|
|
|
|
|
|
|
// buf size part-way into blob[1], should just return blob[0]
|
|
|
|
let mut buf = vec![0; bytes + 1];
|
2018-12-12 15:58:29 -08:00
|
|
|
let (num_blobs, bytes3) = ledger.get_blob_bytes(0, 2, &mut buf, slot).unwrap();
|
2018-11-15 15:53:31 -08:00
|
|
|
assert_eq!(num_blobs, 1);
|
|
|
|
let bytes3 = bytes3 as usize;
|
|
|
|
assert_eq!(bytes3, bytes);
|
|
|
|
|
|
|
|
let mut buf = vec![0; bytes2 - 1];
|
2018-12-12 15:58:29 -08:00
|
|
|
let (num_blobs, bytes4) = ledger.get_blob_bytes(0, 2, &mut buf, slot).unwrap();
|
2018-11-15 15:53:31 -08:00
|
|
|
assert_eq!(num_blobs, 1);
|
|
|
|
let bytes4 = bytes4 as usize;
|
|
|
|
assert_eq!(bytes4, bytes);
|
|
|
|
|
|
|
|
let mut buf = vec![0; bytes * 2];
|
2018-12-12 15:58:29 -08:00
|
|
|
let (num_blobs, bytes6) = ledger.get_blob_bytes(9, 1, &mut buf, slot).unwrap();
|
2018-11-15 15:53:31 -08:00
|
|
|
assert_eq!(num_blobs, 1);
|
|
|
|
let bytes6 = bytes6 as usize;
|
|
|
|
|
|
|
|
{
|
|
|
|
let blob_data = &buf[..bytes6];
|
|
|
|
assert_eq!(blob_data, &blobs[9].data[..bytes6]);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Read out of range
|
2018-12-12 15:58:29 -08:00
|
|
|
assert!(ledger.get_blob_bytes(20, 2, &mut buf, slot).is_err());
|
2018-11-15 15:53:31 -08:00
|
|
|
|
|
|
|
// Destroying database without closing it first is undefined behavior
|
|
|
|
drop(ledger);
|
2018-11-24 19:32:33 -08:00
|
|
|
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
|
2018-11-15 15:53:31 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_insert_data_blobs_basic() {
|
|
|
|
let entries = make_tiny_test_entries(2);
|
|
|
|
let shared_blobs = entries.to_blobs();
|
2018-12-19 16:11:47 -08:00
|
|
|
|
|
|
|
for (i, b) in shared_blobs.iter().enumerate() {
|
|
|
|
b.write().unwrap().set_index(i as u64).unwrap();
|
|
|
|
}
|
|
|
|
|
2018-11-15 15:53:31 -08:00
|
|
|
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
|
|
|
|
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
|
|
|
|
|
|
|
|
let ledger_path = get_tmp_ledger_path("test_insert_data_blobs_basic");
|
|
|
|
let ledger = DbLedger::open(&ledger_path).unwrap();
|
|
|
|
|
|
|
|
// Insert second blob, we're missing the first blob, so should return nothing
|
2018-12-19 16:11:47 -08:00
|
|
|
let result = ledger.insert_data_blobs(vec![blobs[1]]).unwrap();
|
2018-11-15 15:53:31 -08:00
|
|
|
|
|
|
|
assert!(result.len() == 0);
|
|
|
|
let meta = ledger
|
|
|
|
.meta_cf
|
|
|
|
.get(&ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))
|
|
|
|
.unwrap()
|
|
|
|
.expect("Expected new metadata object to be created");
|
|
|
|
assert!(meta.consumed == 0 && meta.received == 2);
|
|
|
|
|
|
|
|
// Insert first blob, check for consecutive returned entries
|
2018-12-19 16:11:47 -08:00
|
|
|
let result = ledger.insert_data_blobs(vec![blobs[0]]).unwrap();
|
2018-11-15 15:53:31 -08:00
|
|
|
|
|
|
|
assert_eq!(result, entries);
|
|
|
|
|
|
|
|
let meta = ledger
|
|
|
|
.meta_cf
|
|
|
|
.get(&ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))
|
|
|
|
.unwrap()
|
|
|
|
.expect("Expected new metadata object to exist");
|
|
|
|
assert!(meta.consumed == 2 && meta.received == 2);
|
|
|
|
|
|
|
|
// Destroying database without closing it first is undefined behavior
|
|
|
|
drop(ledger);
|
2018-11-24 19:32:33 -08:00
|
|
|
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
|
2018-11-15 15:53:31 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_insert_data_blobs_multiple() {
|
|
|
|
let num_blobs = 10;
|
|
|
|
let entries = make_tiny_test_entries(num_blobs);
|
|
|
|
let shared_blobs = entries.to_blobs();
|
2018-12-19 16:11:47 -08:00
|
|
|
for (i, b) in shared_blobs.iter().enumerate() {
|
|
|
|
b.write().unwrap().set_index(i as u64).unwrap();
|
|
|
|
}
|
2018-11-15 15:53:31 -08:00
|
|
|
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
|
|
|
|
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
|
|
|
|
|
|
|
|
let ledger_path = get_tmp_ledger_path("test_insert_data_blobs_multiple");
|
|
|
|
let ledger = DbLedger::open(&ledger_path).unwrap();
|
|
|
|
|
2018-12-12 15:58:29 -08:00
|
|
|
// Insert blobs in reverse, check for consecutive returned blobs
|
2018-11-15 15:53:31 -08:00
|
|
|
for i in (0..num_blobs).rev() {
|
2018-12-19 16:11:47 -08:00
|
|
|
let result = ledger.insert_data_blobs(vec![blobs[i]]).unwrap();
|
2018-11-15 15:53:31 -08:00
|
|
|
|
|
|
|
let meta = ledger
|
|
|
|
.meta_cf
|
|
|
|
.get(&ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))
|
|
|
|
.unwrap()
|
|
|
|
.expect("Expected metadata object to exist");
|
|
|
|
if i != 0 {
|
|
|
|
assert_eq!(result.len(), 0);
|
|
|
|
assert!(meta.consumed == 0 && meta.received == num_blobs as u64);
|
|
|
|
} else {
|
|
|
|
assert_eq!(result, entries);
|
|
|
|
assert!(meta.consumed == num_blobs as u64 && meta.received == num_blobs as u64);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Destroying database without closing it first is undefined behavior
|
|
|
|
drop(ledger);
|
2018-11-24 19:32:33 -08:00
|
|
|
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
|
2018-11-15 15:53:31 -08:00
|
|
|
}
|
2018-11-22 01:35:19 -08:00
|
|
|
|
2018-12-12 15:58:29 -08:00
|
|
|
#[test]
|
|
|
|
fn test_insert_data_blobs_slots() {
|
|
|
|
let num_blobs = 10;
|
|
|
|
let entries = make_tiny_test_entries(num_blobs);
|
|
|
|
let shared_blobs = entries.to_blobs();
|
2018-12-19 16:11:47 -08:00
|
|
|
for (i, b) in shared_blobs.iter().enumerate() {
|
|
|
|
b.write().unwrap().set_index(i as u64).unwrap();
|
|
|
|
}
|
2018-12-12 15:58:29 -08:00
|
|
|
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
|
|
|
|
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
|
|
|
|
|
|
|
|
let ledger_path = get_tmp_ledger_path("test_insert_data_blobs_slots");
|
|
|
|
let ledger = DbLedger::open(&ledger_path).unwrap();
|
|
|
|
|
|
|
|
// Insert last blob into next slot
|
|
|
|
let result = ledger
|
2018-12-19 16:11:47 -08:00
|
|
|
.insert_data_blobs(vec![*blobs.last().unwrap()])
|
2018-12-12 15:58:29 -08:00
|
|
|
.unwrap();
|
|
|
|
assert_eq!(result.len(), 0);
|
|
|
|
|
|
|
|
// Insert blobs into first slot, check for consecutive blobs
|
|
|
|
for i in (0..num_blobs - 1).rev() {
|
2018-12-19 16:11:47 -08:00
|
|
|
let result = ledger.insert_data_blobs(vec![blobs[i]]).unwrap();
|
2018-12-12 15:58:29 -08:00
|
|
|
let meta = ledger
|
|
|
|
.meta_cf
|
|
|
|
.get(&ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))
|
|
|
|
.unwrap()
|
|
|
|
.expect("Expected metadata object to exist");
|
|
|
|
if i != 0 {
|
|
|
|
assert_eq!(result.len(), 0);
|
|
|
|
assert!(meta.consumed == 0 && meta.received == num_blobs as u64);
|
|
|
|
} else {
|
|
|
|
assert_eq!(result, entries);
|
|
|
|
assert!(meta.consumed == num_blobs as u64 && meta.received == num_blobs as u64);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Destroying database without closing it first is undefined behavior
|
|
|
|
drop(ledger);
|
|
|
|
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
|
|
|
|
}
|
|
|
|
|
2018-11-22 01:35:19 -08:00
|
|
|
#[test]
|
|
|
|
pub fn test_iteration_order() {
|
|
|
|
let slot = 0;
|
|
|
|
// Create RocksDb ledger
|
|
|
|
let db_ledger_path = get_tmp_ledger_path("test_iteration_order");
|
|
|
|
{
|
2018-12-18 15:18:57 -08:00
|
|
|
let db_ledger = DbLedger::open(&db_ledger_path).unwrap();
|
2018-11-22 01:35:19 -08:00
|
|
|
|
|
|
|
// Write entries
|
|
|
|
let num_entries = 8;
|
|
|
|
let shared_blobs = make_tiny_test_entries(num_entries).to_blobs();
|
|
|
|
|
2018-11-24 19:32:33 -08:00
|
|
|
for (i, b) in shared_blobs.iter().enumerate() {
|
2018-12-14 17:05:41 -08:00
|
|
|
let mut w_b = b.write().unwrap();
|
|
|
|
w_b.set_index(1 << (i * 8)).unwrap();
|
|
|
|
w_b.set_slot(DEFAULT_SLOT_HEIGHT).unwrap();
|
2018-11-22 01:35:19 -08:00
|
|
|
}
|
|
|
|
|
2018-12-19 16:11:47 -08:00
|
|
|
assert_eq!(
|
|
|
|
db_ledger
|
|
|
|
.write_shared_blobs(&shared_blobs)
|
|
|
|
.expect("Expected successful write of blobs"),
|
|
|
|
vec![]
|
|
|
|
);
|
2018-11-22 01:35:19 -08:00
|
|
|
let mut db_iterator = db_ledger
|
|
|
|
.db
|
|
|
|
.raw_iterator_cf(db_ledger.data_cf.handle(&db_ledger.db))
|
|
|
|
.expect("Expected to be able to open database iterator");
|
|
|
|
|
|
|
|
db_iterator.seek(&DataCf::key(slot, 1));
|
|
|
|
|
|
|
|
// Iterate through ledger
|
|
|
|
for i in 0..num_entries {
|
|
|
|
assert!(db_iterator.valid());
|
|
|
|
let current_key = db_iterator.key().expect("Expected a valid key");
|
|
|
|
let current_index = DataCf::index_from_key(¤t_key)
|
|
|
|
.expect("Expect to be able to parse index from valid key");
|
|
|
|
assert_eq!(current_index, (1 as u64) << (i * 8));
|
|
|
|
db_iterator.next();
|
|
|
|
}
|
|
|
|
}
|
2018-11-24 19:32:33 -08:00
|
|
|
DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction");
|
2018-11-22 01:35:19 -08:00
|
|
|
}
|
2018-12-11 09:14:23 -08:00
|
|
|
|
2018-12-19 16:11:47 -08:00
|
|
|
#[test]
|
|
|
|
pub fn test_insert_data_blobs_bulk() {
|
|
|
|
// Create RocksDb ledger
|
|
|
|
let db_ledger_path = get_tmp_ledger_path("test_insert_data_blobs_bulk");
|
|
|
|
{
|
|
|
|
let db_ledger = DbLedger::open(&db_ledger_path).unwrap();
|
|
|
|
|
|
|
|
// Write entries
|
|
|
|
let num_entries = 20 as u64;
|
|
|
|
let original_entries = make_tiny_test_entries(num_entries as usize);
|
|
|
|
let shared_blobs = original_entries.clone().to_blobs();
|
|
|
|
for (i, b) in shared_blobs.iter().enumerate() {
|
|
|
|
let mut w_b = b.write().unwrap();
|
|
|
|
w_b.set_index(i as u64).unwrap();
|
|
|
|
w_b.set_slot(i as u64).unwrap();
|
|
|
|
}
|
|
|
|
|
|
|
|
assert_eq!(
|
|
|
|
db_ledger
|
|
|
|
.write_shared_blobs(shared_blobs.iter().skip(1).step_by(2))
|
|
|
|
.unwrap(),
|
|
|
|
vec![]
|
|
|
|
);
|
|
|
|
|
|
|
|
assert_eq!(
|
|
|
|
db_ledger
|
|
|
|
.write_shared_blobs(shared_blobs.iter().step_by(2))
|
|
|
|
.unwrap(),
|
|
|
|
original_entries
|
|
|
|
);
|
|
|
|
|
|
|
|
let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT);
|
|
|
|
let meta = db_ledger
|
|
|
|
.meta_cf
|
|
|
|
.get(&db_ledger.db, &meta_key)
|
|
|
|
.unwrap()
|
|
|
|
.unwrap();
|
|
|
|
assert_eq!(meta.consumed, num_entries);
|
|
|
|
assert_eq!(meta.received, num_entries);
|
|
|
|
assert_eq!(meta.consumed_slot, num_entries - 1);
|
|
|
|
assert_eq!(meta.received_slot, num_entries - 1);
|
|
|
|
}
|
|
|
|
DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction");
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
pub fn test_write_consecutive_blobs() {
|
|
|
|
// Create RocksDb ledger
|
|
|
|
let db_ledger_path = get_tmp_ledger_path("test_write_consecutive_blobs");
|
|
|
|
{
|
|
|
|
let db_ledger = DbLedger::open(&db_ledger_path).unwrap();
|
|
|
|
|
|
|
|
// Write entries
|
|
|
|
let num_entries = 20 as u64;
|
|
|
|
let original_entries = make_tiny_test_entries(num_entries as usize);
|
|
|
|
let shared_blobs = original_entries.to_blobs();
|
|
|
|
for (i, b) in shared_blobs.iter().enumerate() {
|
|
|
|
let mut w_b = b.write().unwrap();
|
|
|
|
w_b.set_index(i as u64).unwrap();
|
|
|
|
w_b.set_slot(i as u64).unwrap();
|
|
|
|
}
|
|
|
|
|
|
|
|
db_ledger
|
|
|
|
.write_consecutive_blobs(&shared_blobs)
|
|
|
|
.expect("Expect successful blob writes");
|
|
|
|
|
|
|
|
let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT);
|
|
|
|
let meta = db_ledger
|
|
|
|
.meta_cf
|
|
|
|
.get(&db_ledger.db, &meta_key)
|
|
|
|
.unwrap()
|
|
|
|
.unwrap();
|
|
|
|
assert_eq!(meta.consumed, num_entries);
|
|
|
|
assert_eq!(meta.received, num_entries);
|
|
|
|
assert_eq!(meta.consumed_slot, num_entries - 1);
|
|
|
|
assert_eq!(meta.received_slot, num_entries - 1);
|
|
|
|
|
|
|
|
for (i, b) in shared_blobs.iter().enumerate() {
|
|
|
|
let mut w_b = b.write().unwrap();
|
|
|
|
w_b.set_index(num_entries + i as u64).unwrap();
|
|
|
|
w_b.set_slot(num_entries + i as u64).unwrap();
|
|
|
|
}
|
|
|
|
|
|
|
|
db_ledger
|
|
|
|
.write_consecutive_blobs(&shared_blobs)
|
|
|
|
.expect("Expect successful blob writes");
|
|
|
|
|
|
|
|
let meta = db_ledger
|
|
|
|
.meta_cf
|
|
|
|
.get(&db_ledger.db, &meta_key)
|
|
|
|
.unwrap()
|
|
|
|
.unwrap();
|
|
|
|
assert_eq!(meta.consumed, 2 * num_entries);
|
|
|
|
assert_eq!(meta.received, 2 * num_entries);
|
|
|
|
assert_eq!(meta.consumed_slot, 2 * num_entries - 1);
|
|
|
|
assert_eq!(meta.received_slot, 2 * num_entries - 1);
|
|
|
|
}
|
|
|
|
DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction");
|
|
|
|
}
|
|
|
|
|
2018-12-11 09:14:23 -08:00
|
|
|
#[test]
|
|
|
|
pub fn test_genesis_and_entry_iterator() {
|
|
|
|
// Create RocksDb ledger
|
|
|
|
let entries = make_tiny_test_entries(100);
|
2018-12-12 20:42:12 -08:00
|
|
|
let ledger_path = get_tmp_ledger_path("test_genesis_and_entry_iterator");
|
2018-12-11 09:14:23 -08:00
|
|
|
{
|
2018-12-12 20:42:12 -08:00
|
|
|
assert!(genesis(&ledger_path, &Keypair::new(), &entries).is_ok());
|
2018-12-11 09:14:23 -08:00
|
|
|
|
|
|
|
let ledger = DbLedger::open(&ledger_path).expect("open failed");
|
|
|
|
|
|
|
|
let read_entries: Vec<Entry> =
|
|
|
|
ledger.read_ledger().expect("read_ledger failed").collect();
|
|
|
|
assert_eq!(entries, read_entries);
|
|
|
|
}
|
|
|
|
|
|
|
|
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
|
|
|
|
}
|
|
|
|
|
2018-11-15 15:53:31 -08:00
|
|
|
}
|