feat(): Allow creating database snapshots

Add a `create_snapshot` function, that creates a RocksDB database
snapshot from the given database handle and returns it.

There's a pretty fantastic amount of horribly unsafe Rust going on here
- primarily caused by the fact that the rust-rocksdb snapshot type is,
unlike Iterator or ColumnFamily or any of the other types that really
*should* be, parametrized by the lifetime of a borrowed reference to its
owner database. The abstraction provided by Rustler doesn't allow
non-static lifetime-parametrized struct types to be returned back to
Erlang as resources, for some technical type-mismatch reasons but also
practically because relying on the Erlang garbage collector to free our
stuff doesn't fit at *all* with rust's lifetime model.

The workaround here is to use `mem::transmute` (I told you it was
crazy...) to turn the snapshot reference from a `Snapshot<'a>` to a
`Snapshot<'static>`, and pass *that*, along with a held ARC reference to
the owning DB, to Erlang, and then when the Erlang GC decides it's time
to free the snapshot, go *back* to a first unbounded, then elided
lifetime to force the snapshot to be dropped.

I've done some cursory checking and regular operation (creating a
snapshot from a process then letting that process die) appears to not
leak the snapshots, but we should keep an eye on this to make sure we
don't have any leaks (or worse) in the future.

As a further note, the bit that parses out the argument for the NIFs
that now accept *either* a DB or a snapshot is a little mucky - that
should be revisited at a later date to see if we can clean it up a
little.
This commit is contained in:
Griffin Smith 2017-11-29 17:28:53 -05:00 committed by Ryan Schmukler
parent baad4ab176
commit 81839aa6ae
7 changed files with 353 additions and 51 deletions

View File

@ -4,7 +4,7 @@ defmodule Rox do
"""
alias __MODULE__.{DB,ColumnFamily,Native,Utils,Cursor}
alias __MODULE__.{DB, ColumnFamily, Native, Utils, Cursor, Snapshot}
@type compaction_style :: :level | :universal | :fifo | :none
@type compression_type :: :snappy | :zlib | :bzip2 | :lz4 | :lz4h | :none
@ -139,6 +139,20 @@ defmodule Rox do
end
end
@doc """
Creates a point-in-time snapshot of the given `DB`.
Snapshots are read only views of the database at a point in time - no changes to the database will
be reflected in the view of the snapshot.
"""
@spec create_snapshot(DB.t) :: {:ok, Snapshot.t} | {:error, any}
def create_snapshot(db) do
with {:ok, snapshot} <- Native.create_snapshot(db.resource) do
{:ok, Snapshot.wrap_resource(db, snapshot)}
end
end
@doc """
Create a column family in `db` with `name` and `opts`.
@ -151,17 +165,23 @@ defmodule Rox do
end
@doc """
Gets an existing `ColumnFamily.t` from the database.
Gets an existing `ColumnFamily.t` from the database or snapshot.
The column family must have been created via `create_cf/2` or from `open/3` with the `auto_create_column_families` option.
The column family must have been created via `create_cf/2` or from `open/3` with the
`auto_create_column_families` option.
"""
@spec cf_handle(DB.t, ColumnFamily.name) :: {:ok, ColumnFamily.t} | {:error, any}
@spec cf_handle(DB.t | Snapshot.t, ColumnFamily.name) :: {:ok, ColumnFamily.t} | {:error, any}
def cf_handle(%DB{resource: raw_db} = db, name) do
with {:ok, result} <- Native.cf_handle(raw_db, name) do
{:ok, ColumnFamily.wrap_resource(db, result, name)}
end
end
def cf_handle(%Snapshot{db: %DB{resource: raw_db}} = snapshot, name) do
with {:ok, result} <- Native.cf_handle(raw_db, name) do
{:ok, ColumnFamily.wrap_resource(snapshot, result, name)}
end
end
@doc """
@ -181,15 +201,19 @@ defmodule Rox do
@doc """
Get a key/value pair in the databse or column family with the specified `key`.
Get a key/value pair in the given column family of the given snapshot or database with the
specified `key`.
Optionally takes a list of `read_options`.
For non-binary terms that were stored, they will be automatically decoded.
"""
@spec get(DB.t | ColumnFamily.t, key, read_options) :: {:ok, binary} | {:ok, value} | :not_found | {:error, any}
def get(db_or_cf, key, opts \\ [])
@spec get(DB.t | ColumnFamily.t | Snapshot.t, key, read_options)
:: {:ok, value}
| :not_found
| {:error, any}
def get(db_snapshot_or_cf, key, opts \\ [])
def get(%DB{resource: db}, key, opts) when is_binary(key) and is_list(opts) do
Native.get(db, key, to_map(opts))
|> Utils.decode
@ -198,6 +222,10 @@ defmodule Rox do
Native.get_cf(db, cf, key, to_map(opts))
|> Utils.decode
end
def get(%Snapshot{resource: snapshot}, key, opts) when is_binary(key) and is_list(opts) do
Native.get(snapshot, key, to_map(opts))
|> Utils.decode
end
@doc """
Returns a `Cursor.t` which will iterate records from the provided database or
@ -226,6 +254,11 @@ defmodule Rox do
Cursor.wrap_resource(resource, mode)
end
end
def stream(%Snapshot{resource: snapshot}, mode) do
with {:ok, resource} <- Native.iterate(snapshot, mode) do
Cursor.wrap_resource(resource, mode)
end
end
@doc """

View File

@ -1,20 +1,22 @@
defmodule Rox.ColumnFamily do
@moduledoc """
Struct module representing a handle for a column family within a database.
For working with the column family, see the functions in the top level
`Rox` module.
Implements the `Collectable` and `Enumerable` protocols.
"""
alias Rox.DB
alias Rox.{DB, Snapshot}
@typedoc "A reference to a RocksDB column family"
@type t :: %__MODULE__{
db_resource: binary, cf_resource: binary,
db_reference: reference, name: binary,
db_resource: binary,
cf_resource: binary,
db_reference: reference,
name: binary,
}
defstruct [:db_reference, :db_resource, :cf_resource, :name]
@ -23,8 +25,19 @@ defmodule Rox.ColumnFamily do
@doc false
def wrap_resource(%DB{resource: db_resource, reference: db_reference}, resource, name) do
%__MODULE__{
db_resource: db_resource, db_reference: db_reference,
cf_resource: resource, name: name
db_resource: db_resource,
db_reference: db_reference,
cf_resource: resource,
name: name
}
end
def wrap_resource(%Snapshot{resource: db_resource, reference: db_reference}, resource, name) do
%__MODULE__{
db_resource: db_resource,
db_reference: db_reference,
cf_resource: resource,
name: name
}
end

View File

@ -1,10 +1,10 @@
defmodule Rox.DB do
@moduledoc """
Struct module representing a handle for a database.
For working with the database, see the functions in the top
level `Rox` module.
Implements the `Collectable` and `Enumerable` protocols.
"""

View File

@ -11,6 +11,14 @@ defmodule Rox.Native do
end
end
def create_snapshot(_) do
case :erlang.phash2(1, 1) do
0 -> raise "Nif not loaded"
1 -> {:ok, ""}
2 -> {:error, ""}
end
end
def count(_) do
case :erlang.phash2(1, 1) do
0 -> raise "Nif not loaded"

58
lib/rox/snapshot.ex Normal file
View File

@ -0,0 +1,58 @@
defmodule Rox.Snapshot do
@moduledoc """
Struct module representing a handle for a database snapshot.
Snapshots support all read operations that a `Rox.DB` supports, and implement `Enumerable`, but
not `Collectable`
"""
alias Rox.DB
@typedoc "A reference to a RocksDB database snapshot"
@type t :: %__MODULE__{
resource: binary,
reference: reference,
db: DB.t
}
@enforce_keys [:resource, :reference, :db]
defstruct [
:resource,
:reference,
:db
]
@doc false
def wrap_resource(%DB{} = db, resource) do
%__MODULE__{resource: resource, reference: make_ref(), db: db}
end
defimpl Inspect do
import Inspect.Algebra
def inspect(handle, opts) do
"#Rox.Snapshot<#{to_doc(handle.reference, opts)}>"
end
end
defimpl Enumerable do
def count(snapshot), do: {:ok, Rox.count(snapshot)}
def member?(snapshot, {key, val}) do
with {:ok, stored_val} <- Rox.get(snapshot, key) do
stored_val == {:ok, val}
else
_ -> {:ok, false}
end
end
def member?(_, _), do: {:ok, false}
def reduce(snapshot, cmd, fun) do
Rox.stream(snapshot)
|> Enumerable.reduce(cmd, fun)
end
end
end

View File

@ -20,7 +20,7 @@ use rustler::types::binary::{NifBinary,OwnedNifBinary};
use rustler::types::atom::{NifAtom};
use rustler::types::list::NifListIterator;
use rocksdb::{
DB, IteratorMode, Options, DBCompressionType, WriteOptions,
DB, IteratorMode, Options, Snapshot, DBCompressionType, WriteOptions,
ColumnFamily, Direction, DBIterator, WriteBatch
};
@ -166,28 +166,124 @@ mod atoms {
}
}
struct SnapshotWrapper {
pub snapshot: Option<Snapshot<'static>>,
#[allow(dead_code)] db: Arc<RwLock<DB>>
}
unsafe impl Sync for SnapshotWrapper {}
unsafe impl Send for SnapshotWrapper {}
unsafe fn shorten_snapshot<'a>(s: Snapshot<'static>) -> Snapshot<'a> {
// More dragons: After working mad science to lengthen the lifetime of the snapshot struct when
// creating it, we need to *shorten* its lifetime when its surrounding handle is being dropped,
// to avoid memory leaks
std::mem::transmute(s)
}
impl Deref for SnapshotWrapper {
type Target = Snapshot<'static>;
fn deref(&self) -> &Self::Target {
self.snapshot.as_ref().expect(
"Invariant violation: Attempting to use a freed snapshot handle. Good luck.")
}
}
impl Drop for SnapshotWrapper {
fn drop(&mut self) {
let snapshot = std::mem::replace(&mut self.snapshot, None).expect(
"Invariant violation: Dropping a snapshot handle that's already been dropped.");
unsafe { shorten_snapshot(snapshot) };
}
}
struct SnapshotHandle {
pub snapshot: Arc<SnapshotWrapper>
}
unsafe impl Sync for SnapshotHandle {}
unsafe impl Send for SnapshotHandle {}
impl SnapshotHandle {
fn new<'a>(db: &DBHandle) -> Self {
let read_db = db.read().unwrap();
let snapshot = read_db.snapshot();
// here be dragons!
// The rocksdb snapshot type is (rightfully) parametrized to have the
// same lifetime as its owning DB - this doesn't work for us, however, as we're not using
// Rust lifetimes at *all* to control the lifetime of our values - instead, we rely
// entirely on reference counting and the Erlang garbage collector to free variables.
// Extending this lifetime to static essentially tells Rust to let it live forever, and
// it'll be dropped on its own by the GC handling inside Rustler.
let eternal_snapshot: Snapshot<'static> = unsafe { std::mem::transmute(snapshot) };
let wrapper = SnapshotWrapper{
snapshot: Some(eternal_snapshot),
db: db.deref().clone()
};
SnapshotHandle {
snapshot: Arc::new(wrapper)
}
}
}
impl Deref for SnapshotHandle {
type Target = SnapshotWrapper;
fn deref(&self) -> &Self::Target { self.snapshot.deref() }
}
struct DBHandle {
pub db: Arc<RwLock<DB>>,
}
#[allow(dead_code)]
impl Deref for DBHandle {
type Target = Arc<RwLock<DB>>;
fn deref(&self) -> &Self::Target { &self.db }
}
///
/// Struct representing a held reference to a database for use in refcount pools.
///
/// CFHandle, IteratorHandle, etc need to hold onto these references to prevent the parent database
/// from being dropped before the dependent child objects are
///
enum DatabaseRef {
DB(Arc<RwLock<DB>>),
Snapshot(Arc<SnapshotWrapper>),
}
impl<'a> From<&'a Arc<RwLock<DB>>> for DatabaseRef {
fn from(db: &Arc<RwLock<DB>>) -> Self { DatabaseRef::DB(db.clone()) }
}
impl<'a> From<&'a Arc<SnapshotWrapper>> for DatabaseRef {
fn from(snapshot: &Arc<SnapshotWrapper>) -> Self { DatabaseRef::Snapshot(snapshot.clone()) }
}
struct CFHandle {
pub cf: ColumnFamily,
db: Arc<RwLock<DB>>, // Keep a reference to the DB to prevent segfault bug in Rocks/Rust bindings
#[allow(dead_code)] db: DatabaseRef,
}
unsafe impl Sync for CFHandle {}
unsafe impl Send for CFHandle {}
#[allow(dead_code)]
struct IteratorHandle {
pub iter: RwLock<DBIterator>,
db: Arc<RwLock<DB>>, // Keep a reference to the DB to prevent segfault bug in Rocks/Rust bindings
#[allow(dead_code)] db: DatabaseRef,
}
unsafe impl Sync for IteratorHandle {}
unsafe impl Send for IteratorHandle {}
struct CompressionType {
pub raw: DBCompressionType
}
@ -310,6 +406,7 @@ fn decode_db_options<'a>(env: NifEnv<'a>, arg: NifTerm<'a>) -> NifResult<Options
}
if let Ok(allow_os_buffer) = arg.map_get(atoms::allow_os_buffer().to_term(env)) {
#[allow(deprecated)]
opts.set_allow_os_buffer(allow_os_buffer.decode()?);
}
@ -460,6 +557,16 @@ fn open<'a>(env: NifEnv<'a>, args: &[NifTerm<'a>]) -> NifResult<NifTerm<'a>> {
Ok(resp)
}
fn create_snapshot<'a>(env: NifEnv<'a>, args: &[NifTerm<'a>]) -> NifResult<NifTerm<'a>> {
let db_arc: ResourceArc<DBHandle> = args[0].decode()?;
let db_handle = db_arc.deref();
let resp =
(atoms::ok(), ResourceArc::new(SnapshotHandle::new(db_handle))).encode(env);
Ok(resp)
}
fn count<'a>(env: NifEnv<'a>, args: &[NifTerm<'a>]) -> NifResult<NifTerm<'a>> {
let db_arc: ResourceArc<DBHandle> = args[0].decode()?;
let db_handle = db_arc.deref();
@ -501,7 +608,10 @@ fn create_cf<'a>(env: NifEnv<'a>, args: &[NifTerm<'a>]) -> NifResult<NifTerm<'a>
let cf = handle_error!(env, db.create_cf(name, &opts));
let resp =
(atoms::ok(), ResourceArc::new(CFHandle{cf: cf, db: db_arc.db.clone()})).encode(env);
(atoms::ok(), ResourceArc::new(CFHandle{
cf: cf,
db: DatabaseRef::from(&db_arc.db)
})).encode(env);
Ok(resp)
}
@ -513,7 +623,10 @@ fn cf_handle<'a>(env: NifEnv<'a>, args: &[NifTerm<'a>]) -> NifResult<NifTerm<'a>
let name: &str = args[1].decode()?;
match db.cf_handle(name) {
Some(cf) => Ok((atoms::ok(), ResourceArc::new(CFHandle{cf: cf, db: db_arc.db.clone()})).encode(env)),
Some(cf) => Ok((atoms::ok(), ResourceArc::new(CFHandle{
cf: cf,
db: DatabaseRef::from(&db_arc.db),
})).encode(env)),
None => Ok((atoms::error(), format!("Could not find ColumnFamily named {}", name)).encode(env))
}
}
@ -611,14 +724,16 @@ fn delete_cf<'a>(env: NifEnv<'a>, args: &[NifTerm<'a>]) -> NifResult<NifTerm<'a>
}
fn get<'a>(env: NifEnv<'a>, args: &[NifTerm<'a>]) -> NifResult<NifTerm<'a>> {
let db_arc: ResourceArc<DBHandle> = args[0].decode()?;
let db = db_arc.deref().db.read().unwrap();
let key: NifBinary = args[1].decode()?;
let key = args[1].decode::<NifBinary>()?.as_slice();
let resp =
db.get(key.as_slice());
args[0].decode::<ResourceArc<DBHandle>>().map(|db_arc| {
let db = db_arc.db.read().unwrap();
db.get(key)
}).or_else(|_| {
let snapshot_arc = args[0].decode::<ResourceArc<SnapshotHandle>>()?;
Ok(snapshot_arc.get(key))
})?;
let val_option = handle_error!(env, resp);
@ -634,16 +749,17 @@ fn get<'a>(env: NifEnv<'a>, args: &[NifTerm<'a>]) -> NifResult<NifTerm<'a>> {
}
fn get_cf<'a>(env: NifEnv<'a>, args: &[NifTerm<'a>]) -> NifResult<NifTerm<'a>> {
let db_arc: ResourceArc<DBHandle> = args[0].decode()?;
let db = db_arc.db.read().unwrap();
let cf_arc: ResourceArc<CFHandle> = args[1].decode()?;
let cf = cf_arc.cf;
let key: NifBinary = args[2].decode()?;
let cf = args[1].decode::<ResourceArc<CFHandle>>()?.cf;
let key = args[2].decode::<NifBinary>()?.as_slice();
let resp =
db.get_cf(cf, key.as_slice());
args[0].decode::<ResourceArc<DBHandle>>().map(|db_arc| {
let db = db_arc.db.read().unwrap();
db.get_cf(cf, key)
}).or_else(|_| {
let snapshot_arc = args[0].decode::<ResourceArc<SnapshotHandle>>()?;
Ok(snapshot_arc.get_cf(cf, key))
})?;
let val_option = handle_error!(env, resp);
@ -659,36 +775,56 @@ fn get_cf<'a>(env: NifEnv<'a>, args: &[NifTerm<'a>]) -> NifResult<NifTerm<'a>> {
}
fn iterate<'a>(env: NifEnv<'a>, args: &[NifTerm<'a>]) -> NifResult<NifTerm<'a>> {
let db_arc: ResourceArc<DBHandle> = args[0].decode()?;
let db = db_arc.db.read().unwrap();
let iterator_mode = decode_iterator_mode(args[1])?;
let iter = db.iterator(iterator_mode);
let (iter, db_ref) =
args[0].decode::<ResourceArc<DBHandle>>().and_then(|db_arc| {
let db = db_arc.db.read().unwrap();
Ok((
db.iterator(decode_iterator_mode(args[1])?),
DatabaseRef::from(&db_arc.db)
))
}).or_else(|_| {
let snapshot_arc = args[0].decode::<ResourceArc<SnapshotHandle>>()?;
Ok((
snapshot_arc.snapshot.snapshot.as_ref().unwrap()
.iterator(decode_iterator_mode(args[1])?),
DatabaseRef::from(&snapshot_arc.snapshot)
))
})?;
let resp =
(atoms::ok(), ResourceArc::new(IteratorHandle{
iter: RwLock::new(iter),
db: db_arc.db.clone(),
db: db_ref,
})).encode(env);
Ok(resp)
}
fn iterate_cf<'a>(env: NifEnv<'a>, args: &[NifTerm<'a>]) -> NifResult<NifTerm<'a>> {
let db_arc: ResourceArc<DBHandle> = args[0].decode()?;
let db = db_arc.db.read().unwrap();
let cf = args[1].decode::<ResourceArc<CFHandle>>()?.cf;
let cf_arc: ResourceArc<CFHandle> = args[1].decode()?;
let cf = cf_arc.cf;
let (iter_res, db_ref) =
args[0].decode::<ResourceArc<DBHandle>>().and_then(|db_arc| {
let db = db_arc.db.read().unwrap();
Ok((
db.iterator_cf(cf, decode_iterator_mode(args[2])?),
DatabaseRef::from(&db_arc.db)
))
}).or_else(|_| {
let snapshot_arc = args[0].decode::<ResourceArc<SnapshotHandle>>()?;
Ok((
snapshot_arc.snapshot.snapshot.as_ref().unwrap()
.iterator_cf(cf, decode_iterator_mode(args[2])?),
DatabaseRef::from(&snapshot_arc.snapshot)
))
})?;
let iterator_mode = decode_iterator_mode(args[2])?;
let iter = handle_error!(env, db.iterator_cf(cf, iterator_mode));
let iter = handle_error!(env, iter_res);
let resp =
(atoms::ok(), ResourceArc::new(IteratorHandle{
iter: RwLock::new(iter),
db: db_arc.db.clone(),
db: db_ref,
})).encode(env);
Ok(resp)
@ -752,6 +888,7 @@ fn batch_write<'a>(env: NifEnv<'a>, args: &[NifTerm<'a>]) -> NifResult<NifTerm<'
rustler_export_nifs!(
"Elixir.Rox.Native",
[("open", 3, open),
("create_snapshot", 1, create_snapshot),
("create_cf", 3, create_cf),
("cf_handle", 2, cf_handle),
("put", 4, put),
@ -774,5 +911,7 @@ fn on_load<'a>(env: NifEnv<'a>, _load_info: NifTerm<'a>) -> bool {
resource_struct_init!(DBHandle, env);
resource_struct_init!(CFHandle, env);
resource_struct_init!(IteratorHandle, env);
resource_struct_init!(SnapshotWrapper, env);
resource_struct_init!(SnapshotHandle, env);
true
}

View File

@ -97,6 +97,57 @@ defmodule RoxTest do
end
end
describe "snapshots" do
setup %{db: db, people: people} do
:ok = Rox.put(db, "snapshot_read_test", "some_val")
Enum.each((1..9), & :ok = Rox.put(db, "zz#{&1}", &1))
:ok = Rox.put(people, "goedel", "unsure")
{:ok, snapshot} =
Rox.create_snapshot(db)
{:ok, %{snapshot: snapshot}}
end
test "snapshots can be read from", %{snapshot: snapshot} do
assert {:ok, "some_val"} == Rox.get(snapshot, "snapshot_read_test")
end
test "snapshots allow streaming", %{snapshot: snapshot} do
assert cursor =
Rox.stream(snapshot, {:from, "zz", :forward})
assert Enum.to_list(1..9) == cursor |> Enum.take(9) |> Enum.map(&elem(&1, 1))
end
test "snapshots don't see updates to the base db", %{snapshot: snapshot, db: db} do
assert :not_found = Rox.get(snapshot, "snapshot_put_test")
assert :ok = Rox.put(db, "snapshot_put_test", "some_val")
assert {:ok, "some_val"} = Rox.get(db, "snapshot_put_test")
assert :not_found = Rox.get(snapshot, "snapshot_put_test")
end
test "snapshots allow reading from column families", %{snapshot: snapshot} do
{:ok, cf} =
Rox.cf_handle(snapshot, "people")
assert {:ok, "unsure"} = Rox.get(cf, "goedel")
end
test "snapshots don't see updates to column families", %{snapshot: snapshot, people: people} do
{:ok, people_snap} =
Rox.cf_handle(snapshot, people.name)
assert :ok = Rox.put(people, "escher", "loopy")
assert {:ok, "loopy"} = Rox.get(people, "escher")
assert :not_found = Rox.get(people_snap, "escher")
assert :ok = Rox.put(people, "goedel", "uncertain")
assert {:ok, "unsure"} = Rox.get(people_snap, "goedel")
end
end
describe "Batch Operations" do
test "puts and deletes", %{db: db, people: people} do
assert :not_found = Rox.get(db, "batch_put_test")