add genesis and read_ledger to db_ledger (#2097)
This commit is contained in:
parent
e3dfd7b1ab
commit
4d67aca919
101
src/db_ledger.rs
101
src/db_ledger.rs
|
@ -7,15 +7,15 @@ use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
|
|||
use crate::result::{Error, Result};
|
||||
use bincode::{deserialize, serialize};
|
||||
use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
|
||||
use rocksdb::{ColumnFamily, Options, WriteBatch, DB};
|
||||
use rocksdb::{ColumnFamily, DBRawIterator, Options, WriteBatch, DB};
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::Serialize;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||
use std::borrow::Borrow;
|
||||
use std::io;
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||
use std::path::Path;
|
||||
|
||||
pub const DB_LEDGER_DIRECTORY: &str = "db_ledger";
|
||||
pub const DB_LEDGER_DIRECTORY: &str = "rocksdb";
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub enum DbLedgerError {
|
||||
|
@ -239,7 +239,7 @@ pub const ERASURE_CF: &str = "erasure";
|
|||
impl DbLedger {
|
||||
// Opens a Ledger in directory, provides "infinite" window of blobs
|
||||
pub fn open(ledger_path: &str) -> Result<Self> {
|
||||
let ledger_path = format!("{}/{}", ledger_path, DB_LEDGER_DIRECTORY);
|
||||
let ledger_path = Path::new(ledger_path).join(DB_LEDGER_DIRECTORY);
|
||||
|
||||
// Use default database options
|
||||
let mut options = Options::default();
|
||||
|
@ -270,7 +270,7 @@ impl DbLedger {
|
|||
}
|
||||
|
||||
pub fn destroy(ledger_path: &str) -> Result<()> {
|
||||
let ledger_path = format!("{}/{}", ledger_path, DB_LEDGER_DIRECTORY);
|
||||
let ledger_path = Path::new(ledger_path).join(DB_LEDGER_DIRECTORY);
|
||||
DB::destroy(&Options::default(), &ledger_path)?;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -310,14 +310,10 @@ impl DbLedger {
|
|||
I: IntoIterator,
|
||||
I::Item: Borrow<Entry>,
|
||||
{
|
||||
let default_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
|
||||
let shared_blobs = entries.into_iter().enumerate().map(|(idx, entry)| {
|
||||
entry.borrow().to_blob(
|
||||
Some(idx as u64),
|
||||
Some(Pubkey::default()),
|
||||
Some(&default_addr),
|
||||
)
|
||||
});
|
||||
let shared_blobs = entries
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(idx, entry)| entry.borrow().to_blob(Some(idx as u64), None, None));
|
||||
self.write_shared_blobs(slot, shared_blobs)
|
||||
}
|
||||
|
||||
|
@ -454,6 +450,46 @@ impl DbLedger {
|
|||
|
||||
Ok((total_blobs, total_current_size as u64))
|
||||
}
|
||||
|
||||
/// Return an iterator for all the entries in the given file.
|
||||
pub fn read_ledger(&self) -> Result<impl Iterator<Item = Entry>> {
|
||||
let mut db_iterator = self.db.raw_iterator_cf(self.data_cf.handle(&self.db))?;
|
||||
|
||||
db_iterator.seek_to_first();
|
||||
Ok(EntryIterator { db_iterator })
|
||||
}
|
||||
}
|
||||
|
||||
struct EntryIterator {
|
||||
db_iterator: DBRawIterator,
|
||||
// https://github.com/rust-rocksdb/rust-rocksdb/issues/234
|
||||
// rocksdb issue: the _db_ledger member must be lower in the struct to prevent a crash
|
||||
// when the db_iterator member above is dropped.
|
||||
// _db_ledger is unused, but dropping _db_ledger results in a broken db_iterator
|
||||
// you have to hold the database open in order to iterate over it, and in order
|
||||
// for db_iterator to be able to run Drop
|
||||
// _db_ledger: DbLedger,
|
||||
}
|
||||
|
||||
impl Iterator for EntryIterator {
|
||||
type Item = Entry;
|
||||
|
||||
fn next(&mut self) -> Option<Entry> {
|
||||
if self.db_iterator.valid() {
|
||||
if let Some(value) = self.db_iterator.value() {
|
||||
self.db_iterator.next();
|
||||
|
||||
match deserialize(&value[BLOB_HEADER_SIZE..]) {
|
||||
Ok(entry) => Some(entry),
|
||||
_ => None,
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn write_entries_to_ledger<I>(ledger_paths: &[&str], entries: I)
|
||||
|
@ -471,6 +507,24 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
pub fn genesis<'a, I>(ledger_path: &str, keypair: Option<&Keypair>, entries: I) -> Result<()>
|
||||
where
|
||||
I: IntoIterator<Item = &'a Entry>,
|
||||
{
|
||||
let mut db_ledger = DbLedger::open(ledger_path)?;
|
||||
|
||||
let pubkey = keypair.map(|k| k.pubkey());
|
||||
|
||||
// TODO sign these blobs with keypair
|
||||
let blobs = entries
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(idx, entry)| entry.borrow().to_blob(Some(idx as u64), pubkey, None));
|
||||
|
||||
db_ledger.write_shared_blobs(DEFAULT_SLOT_HEIGHT, blobs)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
@ -705,4 +759,23 @@ mod tests {
|
|||
}
|
||||
DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction");
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_genesis_and_entry_iterator() {
|
||||
// Create RocksDb ledger
|
||||
let entries = make_tiny_test_entries(100);
|
||||
let ledger_path = get_tmp_ledger_path("test_entry_iterator");
|
||||
{
|
||||
assert!(genesis(&ledger_path, None, &entries).is_ok());
|
||||
|
||||
let ledger = DbLedger::open(&ledger_path).expect("open failed");
|
||||
|
||||
let read_entries: Vec<Entry> =
|
||||
ledger.read_ledger().expect("read_ledger failed").collect();
|
||||
assert_eq!(entries, read_entries);
|
||||
}
|
||||
|
||||
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue