feat(stream): add support for stream_keys/2 and stream/2

This commit is contained in:
Ryan Schmukler 2016-08-08 21:25:44 -04:00
parent b5a7465f1b
commit 23c36ab9f2
1 changed files with 71 additions and 4 deletions

View File

@ -5,7 +5,7 @@ defmodule Rox do
@type compression_type :: :snappy | :zlib | :bzip2 | :lz4 | :lz4h | :none
@type key :: String.t | binary
@type value :: String.t | binary
@type value :: any
@type db_handle :: binary
@type cf_handle :: binary
@ -114,7 +114,8 @@ defmodule Rox do
{:iterate_upper_bound, binary} |
{:tailing, boolean} |
{:total_order_seek, boolean} |
{:snapshot, snapshot_handle}
{:snapshot, snapshot_handle} |
{:decode, boolean}
]
@type write_options :: [
@ -135,7 +136,7 @@ defmodule Rox do
@type iterator_action :: :first | :last | :next | :prev | binary
@doc """
Open a RocksDB with the specified read options and column family options
Open a RocksDB with the specified database options and column family options
"""
@spec open(path :: file_path, db_opts :: db_options, cf_opts :: cf_options) :: {:ok, db_handle} | {:error, any}
@ -185,8 +186,74 @@ defmodule Rox do
def put(db, cf, key, value, write_opts), do:
:erocksdb.put(db, cf, key, :erlang.term_to_binary(value), write_opts)
@doc """
Retrieve a key/value pair in the default column family
For non binary terms, you may use `decode: true` to automatically decode the binary back into the term.
"""
@spec get(db_handle, key, read_options) :: {:ok, binary} | {:ok, value} | :not_found | {:error, any}
def get(db, key, read_opts \\ []) do
{ auto_decode, read_opts } = Keyword.pop(read_opts, :decode)
with {:ok, val } <- :erocksdb.get(db, key, read_opts) do
if auto_decode do
{:ok, :erlang.binary_to_term(val)}
else
{:ok, val}
end
end
end
@doc """
Creates an Elixir stream of the keys within the `db_handle`.
"""
@spec stream_keys(db_handle, read_options) :: Stream.t
def stream_keys(db, read_opts \\ []) do
Stream.resource(fn ->
{:ok, iter } = :erocksdb.iterator(db, read_opts, :keys_only)
{iter, :first}
end, fn {iter, dir} ->
case :erocksdb.iterator_move(iter, dir) do
{:ok, key} -> {[key], {iter, :next}}
{:error, :invalid_iterator} -> {:halt, iter}
end
end, fn {iter,_} ->
:erocksdb.iterator_close(iter)
end)
end
def stream(db, read_opts \\ []) do
{auto_decode, read_opts } = Keyword.pop(read_opts, :decode)
scan = fn {iter, dir} ->
case :erocksdb.iterator_move(iter, dir) do
{:ok, key, val} -> {[{key, val}], {iter, :next}}
{:error, :invalid_iterator} -> {:halt, iter}
end
end
scan_or_decode = if auto_decode do
fn arg ->
with {[{key, val}], acc } <-scan.(arg) do
val = :erlang.binary_to_term(val)
{[{key, val}], acc}
end
end
else
scan
end
Stream.resource(fn ->
{:ok, iter } = :erocksdb.iterator(db, read_opts)
{iter, :first}
end, scan_or_decode, fn {iter,_} ->
:erocksdb.iterator_close(iter)
end)
end
defp sanitize_opts(opts) do
[ raw, rest ] = Keyword.split(opts, @opts_to_convert_to_bitlists)
{ raw, rest } = Keyword.split(opts, @opts_to_convert_to_bitlists)
converted = Enum.map(raw, fn {k, val} -> {k, to_charlist(val)} end)
Keyword.merge(rest, converted)
end