From 23c36ab9f20f43d28c4b01f4b1a4bb2ee261786b Mon Sep 17 00:00:00 2001 From: Ryan Schmukler Date: Mon, 8 Aug 2016 21:25:44 -0400 Subject: [PATCH] feat(stream): add support for stream_keys/2 and stream/2 --- lib/rox.ex | 75 +++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 71 insertions(+), 4 deletions(-) diff --git a/lib/rox.ex b/lib/rox.ex index 0099692..259eb04 100644 --- a/lib/rox.ex +++ b/lib/rox.ex @@ -5,7 +5,7 @@ defmodule Rox do @type compression_type :: :snappy | :zlib | :bzip2 | :lz4 | :lz4h | :none @type key :: String.t | binary - @type value :: String.t | binary + @type value :: any @type db_handle :: binary @type cf_handle :: binary @@ -114,7 +114,8 @@ defmodule Rox do {:iterate_upper_bound, binary} | {:tailing, boolean} | {:total_order_seek, boolean} | - {:snapshot, snapshot_handle} + {:snapshot, snapshot_handle} | + {:decode, boolean} ] @type write_options :: [ @@ -135,7 +136,7 @@ defmodule Rox do @type iterator_action :: :first | :last | :next | :prev | binary @doc """ - Open a RocksDB with the specified read options and column family options + Open a RocksDB with the specified database options and column family options """ @spec open(path :: file_path, db_opts :: db_options, cf_opts :: cf_options) :: {:ok, db_handle} | {:error, any} @@ -185,8 +186,74 @@ defmodule Rox do def put(db, cf, key, value, write_opts), do: :erocksdb.put(db, cf, key, :erlang.term_to_binary(value), write_opts) + @doc """ + Retrieve a key/value pair in the default column family + + For non binary terms, you may use `decode: true` to automatically decode the binary back into the term. + """ + + @spec get(db_handle, key, read_options) :: {:ok, binary} | {:ok, value} | :not_found | {:error, any} + def get(db, key, read_opts \\ []) do + { auto_decode, read_opts } = Keyword.pop(read_opts, :decode) + with {:ok, val } <- :erocksdb.get(db, key, read_opts) do + if auto_decode do + {:ok, :erlang.binary_to_term(val)} + else + {:ok, val} + end + end + end + + @doc """ + Creates an Elixir stream of the keys within the `db_handle`. + """ + + @spec stream_keys(db_handle, read_options) :: Stream.t + def stream_keys(db, read_opts \\ []) do + Stream.resource(fn -> + {:ok, iter } = :erocksdb.iterator(db, read_opts, :keys_only) + {iter, :first} + end, fn {iter, dir} -> + case :erocksdb.iterator_move(iter, dir) do + {:ok, key} -> {[key], {iter, :next}} + {:error, :invalid_iterator} -> {:halt, iter} + end + end, fn {iter,_} -> + :erocksdb.iterator_close(iter) + end) + end + + def stream(db, read_opts \\ []) do + {auto_decode, read_opts } = Keyword.pop(read_opts, :decode) + + scan = fn {iter, dir} -> + case :erocksdb.iterator_move(iter, dir) do + {:ok, key, val} -> {[{key, val}], {iter, :next}} + {:error, :invalid_iterator} -> {:halt, iter} + end + end + + scan_or_decode = if auto_decode do + fn arg -> + with {[{key, val}], acc } <-scan.(arg) do + val = :erlang.binary_to_term(val) + {[{key, val}], acc} + end + end + else + scan + end + + Stream.resource(fn -> + {:ok, iter } = :erocksdb.iterator(db, read_opts) + {iter, :first} + end, scan_or_decode, fn {iter,_} -> + :erocksdb.iterator_close(iter) + end) + end + defp sanitize_opts(opts) do - [ raw, rest ] = Keyword.split(opts, @opts_to_convert_to_bitlists) + { raw, rest } = Keyword.split(opts, @opts_to_convert_to_bitlists) converted = Enum.map(raw, fn {k, val} -> {k, to_charlist(val)} end) Keyword.merge(rest, converted) end