diff --git a/src/db_ledger.rs b/src/db_ledger.rs index 1e1adf402..84f34a24f 100644 --- a/src/db_ledger.rs +++ b/src/db_ledger.rs @@ -7,15 +7,15 @@ use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE}; use crate::result::{Error, Result}; use bincode::{deserialize, serialize}; use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; -use rocksdb::{ColumnFamily, Options, WriteBatch, DB}; +use rocksdb::{ColumnFamily, DBRawIterator, Options, WriteBatch, DB}; use serde::de::DeserializeOwned; use serde::Serialize; -use solana_sdk::pubkey::Pubkey; +use solana_sdk::signature::{Keypair, KeypairUtil}; use std::borrow::Borrow; use std::io; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::path::Path; -pub const DB_LEDGER_DIRECTORY: &str = "db_ledger"; +pub const DB_LEDGER_DIRECTORY: &str = "rocksdb"; #[derive(Debug, PartialEq, Eq)] pub enum DbLedgerError { @@ -239,7 +239,7 @@ pub const ERASURE_CF: &str = "erasure"; impl DbLedger { // Opens a Ledger in directory, provides "infinite" window of blobs pub fn open(ledger_path: &str) -> Result { - let ledger_path = format!("{}/{}", ledger_path, DB_LEDGER_DIRECTORY); + let ledger_path = Path::new(ledger_path).join(DB_LEDGER_DIRECTORY); // Use default database options let mut options = Options::default(); @@ -270,7 +270,7 @@ impl DbLedger { } pub fn destroy(ledger_path: &str) -> Result<()> { - let ledger_path = format!("{}/{}", ledger_path, DB_LEDGER_DIRECTORY); + let ledger_path = Path::new(ledger_path).join(DB_LEDGER_DIRECTORY); DB::destroy(&Options::default(), &ledger_path)?; Ok(()) } @@ -310,14 +310,10 @@ impl DbLedger { I: IntoIterator, I::Item: Borrow, { - let default_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); - let shared_blobs = entries.into_iter().enumerate().map(|(idx, entry)| { - entry.borrow().to_blob( - Some(idx as u64), - Some(Pubkey::default()), - Some(&default_addr), - ) - }); + let shared_blobs = entries + .into_iter() + .enumerate() + .map(|(idx, entry)| entry.borrow().to_blob(Some(idx as u64), None, None)); self.write_shared_blobs(slot, shared_blobs) } @@ -454,6 +450,46 @@ impl DbLedger { Ok((total_blobs, total_current_size as u64)) } + + /// Return an iterator for all the entries in the given file. + pub fn read_ledger(&self) -> Result> { + let mut db_iterator = self.db.raw_iterator_cf(self.data_cf.handle(&self.db))?; + + db_iterator.seek_to_first(); + Ok(EntryIterator { db_iterator }) + } +} + +struct EntryIterator { + db_iterator: DBRawIterator, + // https://github.com/rust-rocksdb/rust-rocksdb/issues/234 + // rocksdb issue: the _db_ledger member must be lower in the struct to prevent a crash + // when the db_iterator member above is dropped. + // _db_ledger is unused, but dropping _db_ledger results in a broken db_iterator + // you have to hold the database open in order to iterate over it, and in order + // for db_iterator to be able to run Drop + // _db_ledger: DbLedger, +} + +impl Iterator for EntryIterator { + type Item = Entry; + + fn next(&mut self) -> Option { + if self.db_iterator.valid() { + if let Some(value) = self.db_iterator.value() { + self.db_iterator.next(); + + match deserialize(&value[BLOB_HEADER_SIZE..]) { + Ok(entry) => Some(entry), + _ => None, + } + } else { + None + } + } else { + None + } + } } pub fn write_entries_to_ledger(ledger_paths: &[&str], entries: I) @@ -471,6 +507,24 @@ where } } +pub fn genesis<'a, I>(ledger_path: &str, keypair: Option<&Keypair>, entries: I) -> Result<()> +where + I: IntoIterator, +{ + let mut db_ledger = DbLedger::open(ledger_path)?; + + let pubkey = keypair.map(|k| k.pubkey()); + + // TODO sign these blobs with keypair + let blobs = entries + .into_iter() + .enumerate() + .map(|(idx, entry)| entry.borrow().to_blob(Some(idx as u64), pubkey, None)); + + db_ledger.write_shared_blobs(DEFAULT_SLOT_HEIGHT, blobs)?; + Ok(()) +} + #[cfg(test)] mod tests { use super::*; @@ -705,4 +759,23 @@ mod tests { } DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction"); } + + #[test] + pub fn test_genesis_and_entry_iterator() { + // Create RocksDb ledger + let entries = make_tiny_test_entries(100); + let ledger_path = get_tmp_ledger_path("test_entry_iterator"); + { + assert!(genesis(&ledger_path, None, &entries).is_ok()); + + let ledger = DbLedger::open(&ledger_path).expect("open failed"); + + let read_entries: Vec = + ledger.read_ledger().expect("read_ledger failed").collect(); + assert_eq!(entries, read_entries); + } + + DbLedger::destroy(&ledger_path).expect("Expected successful database destruction"); + } + }