Add API to iterate over slot's blobs (#4276)

This commit is contained in:
carllin 2019-05-13 22:04:54 -07:00 committed by GitHub
parent 88c2d0fad4
commit e20a8329d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 78 additions and 19 deletions

View File

@ -188,6 +188,14 @@ impl Blocktree {
Ok(db_iterator)
}
pub fn slot_data_iterator(
&self,
slot: u64,
) -> Result<impl Iterator<Item = ((u64, u64), Vec<u8>)>> {
let slot_iterator = self.db.iter::<cf::Data>(Some((slot, 0)))?;
Ok(slot_iterator.take_while(move |((blob_slot, _), _)| *blob_slot == slot))
}
pub fn write_shared_blobs<I>(&self, shared_blobs: I) -> Result<()>
where
I: IntoIterator,
@ -673,7 +681,7 @@ impl Blocktree {
}
pub fn read_ledger_blobs(&self) -> impl Iterator<Item = Blob> + '_ {
let iter = self.db.iter::<cf::Data>().unwrap();
let iter = self.db.iter::<cf::Data>(None).unwrap();
iter.map(|(_, blob_data)| Blob::new(&blob_data))
}
@ -3042,6 +3050,34 @@ pub mod tests {
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
fn test_slot_data_iterator() {
// Construct the blobs
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let blobs_per_slot = 10;
let slots = vec![2, 4, 8, 12];
let all_blobs = make_chaining_slot_entries(&slots, blobs_per_slot);
let slot_8_blobs = all_blobs[2].0.clone();
for (slot_blobs, _) in all_blobs {
blocktree.insert_data_blobs(&slot_blobs[..]).unwrap();
}
// Slot doesnt exist, iterator should be empty
let blob_iter = blocktree.slot_data_iterator(5).unwrap();
let result: Vec<_> = blob_iter.collect();
assert_eq!(result, vec![]);
// Test that the iterator for slot 8 contains what was inserted earlier
let blob_iter = blocktree.slot_data_iterator(8).unwrap();
let result: Vec<_> = blob_iter.map(|(_, bytes)| Blob::new(&bytes)).collect();
assert_eq!(result.len() as u64, blobs_per_slot);
assert_eq!(result, slot_8_blobs);
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
mod erasure {
use super::*;
use crate::blocktree::meta::ErasureMetaStatus;

View File

@ -60,7 +60,7 @@ pub trait Backend: Sized + Send + Sync {
fn delete_cf(&self, cf: Self::ColumnFamily, key: &Self::Key) -> Result<()>;
fn iterator_cf(&self, cf: Self::ColumnFamily) -> Result<Self::Iter>;
fn iterator_cf(&self, cf: Self::ColumnFamily, from: Option<&Self::Key>) -> Result<Self::Iter>;
fn raw_iterator_cf(&self, cf: Self::ColumnFamily) -> Result<Self::Cursor>;
@ -241,16 +241,24 @@ where
})
}
pub fn iter<C>(&self) -> Result<impl Iterator<Item = (C::Index, Vec<u8>)>>
pub fn iter<C>(
&self,
start_from: Option<C::Index>,
) -> Result<impl Iterator<Item = (C::Index, Vec<u8>)>>
where
C: Column<B>,
{
let iter = self
.backend
.iterator_cf(self.cf_handle::<C>())?
.map(|(key, value)| (C::index(&key), value.into()));
let iter = {
if let Some(index) = start_from {
let key = C::key(index);
self.backend
.iterator_cf(self.cf_handle::<C>(), Some(key.borrow()))?
} else {
self.backend.iterator_cf(self.cf_handle::<C>(), None)?
}
};
Ok(iter)
Ok(iter.map(|(key, value)| (C::index(&key), value.into())))
}
#[inline]
@ -371,13 +379,21 @@ where
})
}
pub fn iter(&self) -> Result<impl Iterator<Item = (C::Index, Vec<u8>)>> {
let iter = self
.backend
.iterator_cf(self.handle())?
.map(|(key, value)| (C::index(&key), value.into()));
pub fn iter(
&self,
start_from: Option<C::Index>,
) -> Result<impl Iterator<Item = (C::Index, Vec<u8>)>> {
let iter = {
if let Some(index) = start_from {
let key = C::key(index);
self.backend
.iterator_cf(self.handle(), Some(key.borrow()))?
} else {
self.backend.iterator_cf(self.handle(), None)?
}
};
Ok(iter)
Ok(iter.map(|(key, value)| (C::index(&key), value.into())))
}
#[inline]

View File

@ -6,8 +6,8 @@ use crate::result::{Error, Result};
use byteorder::{BigEndian, ByteOrder};
use rocksdb::{
self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, IteratorMode, Options,
WriteBatch as RWriteBatch, DB,
self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, Direction, IteratorMode,
Options, WriteBatch as RWriteBatch, DB,
};
use std::fs;
@ -101,10 +101,17 @@ impl Backend for Rocks {
Ok(())
}
fn iterator_cf(&self, cf: ColumnFamily) -> Result<DBIterator> {
let raw_iter = self.0.iterator_cf(cf, IteratorMode::Start)?;
fn iterator_cf(&self, cf: ColumnFamily, start_from: Option<&[u8]>) -> Result<DBIterator> {
let iter = {
if let Some(start_from) = start_from {
self.0
.iterator_cf(cf, IteratorMode::From(start_from, Direction::Forward))?
} else {
self.0.iterator_cf(cf, IteratorMode::Start)?
}
};
Ok(raw_iter)
Ok(iter)
}
fn raw_iterator_cf(&self, cf: ColumnFamily) -> Result<DBRawIterator> {