commit
d970af8724
26
CHANGELOG.md
26
CHANGELOG.md
|
@ -1,5 +1,31 @@
|
||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## 0.8.3
|
||||||
|
|
||||||
|
*May 21, 2018*
|
||||||
|
|
||||||
|
FEATURES:
|
||||||
|
|
||||||
|
- [common] ASCIITrim()
|
||||||
|
|
||||||
|
## 0.8.2 (April 23rd, 2018)
|
||||||
|
|
||||||
|
FEATURES:
|
||||||
|
|
||||||
|
- [pubsub] TagMap, NewTagMap
|
||||||
|
- [merkle] SimpleProofsFromMap()
|
||||||
|
- [common] IsASCIIText()
|
||||||
|
- [common] PrefixEndBytes // e.g. increment or nil
|
||||||
|
- [common] BitArray.MarshalJSON/.UnmarshalJSON
|
||||||
|
- [common] BitArray uses 'x' not 'X' for String() and above.
|
||||||
|
- [db] DebugDB shows better colorized output
|
||||||
|
|
||||||
|
BUG FIXES:
|
||||||
|
|
||||||
|
- [common] Fix TestParallelAbort nondeterministic failure #201/#202
|
||||||
|
- [db] PrefixDB Iterator/ReverseIterator fixes
|
||||||
|
- [db] DebugDB fixes
|
||||||
|
|
||||||
## 0.8.1 (April 5th, 2018)
|
## 0.8.1 (April 5th, 2018)
|
||||||
|
|
||||||
FEATURES:
|
FEATURES:
|
||||||
|
|
|
@ -32,7 +32,7 @@ type TaskResultSet struct {
|
||||||
func newTaskResultSet(chz []TaskResultCh) *TaskResultSet {
|
func newTaskResultSet(chz []TaskResultCh) *TaskResultSet {
|
||||||
return &TaskResultSet{
|
return &TaskResultSet{
|
||||||
chz: chz,
|
chz: chz,
|
||||||
results: nil,
|
results: make([]taskResultOK, len(chz)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,19 +49,21 @@ func (trs *TaskResultSet) LatestResult(index int) (TaskResult, bool) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOTE: Not concurrency safe.
|
// NOTE: Not concurrency safe.
|
||||||
|
// Writes results to trs.results without waiting for all tasks to complete.
|
||||||
func (trs *TaskResultSet) Reap() *TaskResultSet {
|
func (trs *TaskResultSet) Reap() *TaskResultSet {
|
||||||
if trs.results == nil {
|
|
||||||
trs.results = make([]taskResultOK, len(trs.chz))
|
|
||||||
}
|
|
||||||
for i := 0; i < len(trs.results); i++ {
|
for i := 0; i < len(trs.results); i++ {
|
||||||
var trch = trs.chz[i]
|
var trch = trs.chz[i]
|
||||||
select {
|
select {
|
||||||
case result := <-trch:
|
case result, ok := <-trch:
|
||||||
// Overwrite result.
|
if ok {
|
||||||
|
// Write result.
|
||||||
trs.results[i] = taskResultOK{
|
trs.results[i] = taskResultOK{
|
||||||
TaskResult: result,
|
TaskResult: result,
|
||||||
OK: true,
|
OK: true,
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// We already wrote it.
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
// Do nothing.
|
// Do nothing.
|
||||||
}
|
}
|
||||||
|
@ -69,6 +71,27 @@ func (trs *TaskResultSet) Reap() *TaskResultSet {
|
||||||
return trs
|
return trs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NOTE: Not concurrency safe.
|
||||||
|
// Like Reap() but waits until all tasks have returned or panic'd.
|
||||||
|
func (trs *TaskResultSet) Wait() *TaskResultSet {
|
||||||
|
for i := 0; i < len(trs.results); i++ {
|
||||||
|
var trch = trs.chz[i]
|
||||||
|
select {
|
||||||
|
case result, ok := <-trch:
|
||||||
|
if ok {
|
||||||
|
// Write result.
|
||||||
|
trs.results[i] = taskResultOK{
|
||||||
|
TaskResult: result,
|
||||||
|
OK: true,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// We already wrote it.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return trs
|
||||||
|
}
|
||||||
|
|
||||||
// Returns the firstmost (by task index) error as
|
// Returns the firstmost (by task index) error as
|
||||||
// discovered by all previous Reap() calls.
|
// discovered by all previous Reap() calls.
|
||||||
func (trs *TaskResultSet) FirstValue() interface{} {
|
func (trs *TaskResultSet) FirstValue() interface{} {
|
||||||
|
@ -116,7 +139,11 @@ func Parallel(tasks ...Task) (trs *TaskResultSet, ok bool) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if pnk := recover(); pnk != nil {
|
if pnk := recover(); pnk != nil {
|
||||||
atomic.AddInt32(numPanics, 1)
|
atomic.AddInt32(numPanics, 1)
|
||||||
|
// Send panic to taskResultCh.
|
||||||
taskResultCh <- TaskResult{nil, ErrorWrap(pnk, "Panic in task")}
|
taskResultCh <- TaskResult{nil, ErrorWrap(pnk, "Panic in task")}
|
||||||
|
// Closing taskResultCh lets trs.Wait() work.
|
||||||
|
close(taskResultCh)
|
||||||
|
// Decrement waitgroup.
|
||||||
taskDoneCh <- false
|
taskDoneCh <- false
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -125,6 +152,8 @@ func Parallel(tasks ...Task) (trs *TaskResultSet, ok bool) {
|
||||||
// Send val/err to taskResultCh.
|
// Send val/err to taskResultCh.
|
||||||
// NOTE: Below this line, nothing must panic/
|
// NOTE: Below this line, nothing must panic/
|
||||||
taskResultCh <- TaskResult{val, err}
|
taskResultCh <- TaskResult{val, err}
|
||||||
|
// Closing taskResultCh lets trs.Wait() work.
|
||||||
|
close(taskResultCh)
|
||||||
// Decrement waitgroup.
|
// Decrement waitgroup.
|
||||||
taskDoneCh <- abort
|
taskDoneCh <- abort
|
||||||
}(i, task, taskResultCh)
|
}(i, task, taskResultCh)
|
||||||
|
|
|
@ -91,10 +91,14 @@ func TestParallelAbort(t *testing.T) {
|
||||||
// Now let the last task (#3) complete after abort.
|
// Now let the last task (#3) complete after abort.
|
||||||
flow4 <- <-flow3
|
flow4 <- <-flow3
|
||||||
|
|
||||||
|
// Wait until all tasks have returned or panic'd.
|
||||||
|
taskResultSet.Wait()
|
||||||
|
|
||||||
// Verify task #0, #1, #2.
|
// Verify task #0, #1, #2.
|
||||||
checkResult(t, taskResultSet, 0, 0, nil, nil)
|
checkResult(t, taskResultSet, 0, 0, nil, nil)
|
||||||
checkResult(t, taskResultSet, 1, 1, errors.New("some error"), nil)
|
checkResult(t, taskResultSet, 1, 1, errors.New("some error"), nil)
|
||||||
checkResult(t, taskResultSet, 2, 2, nil, nil)
|
checkResult(t, taskResultSet, 2, 2, nil, nil)
|
||||||
|
checkResult(t, taskResultSet, 3, 3, nil, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestParallelRecover(t *testing.T) {
|
func TestParallelRecover(t *testing.T) {
|
||||||
|
|
|
@ -3,6 +3,7 @@ package common
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
@ -249,13 +250,14 @@ func (bA *BitArray) PickRandom() (int, bool) {
|
||||||
return 0, false
|
return 0, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// String returns a string representation of BitArray: BA{<bit-string>},
|
||||||
|
// where <bit-string> is a sequence of 'x' (1) and '_' (0).
|
||||||
|
// The <bit-string> includes spaces and newlines to help people.
|
||||||
|
// For a simple sequence of 'x' and '_' characters with no spaces or newlines,
|
||||||
|
// see the MarshalJSON() method.
|
||||||
|
// Example: "BA{_x_}" or "nil-BitArray" for nil.
|
||||||
func (bA *BitArray) String() string {
|
func (bA *BitArray) String() string {
|
||||||
if bA == nil {
|
return bA.StringIndented("")
|
||||||
return "nil-BitArray"
|
|
||||||
}
|
|
||||||
bA.mtx.Lock()
|
|
||||||
defer bA.mtx.Unlock()
|
|
||||||
return bA.stringIndented("")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bA *BitArray) StringIndented(indent string) string {
|
func (bA *BitArray) StringIndented(indent string) string {
|
||||||
|
@ -268,12 +270,11 @@ func (bA *BitArray) StringIndented(indent string) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bA *BitArray) stringIndented(indent string) string {
|
func (bA *BitArray) stringIndented(indent string) string {
|
||||||
|
|
||||||
lines := []string{}
|
lines := []string{}
|
||||||
bits := ""
|
bits := ""
|
||||||
for i := 0; i < bA.Bits; i++ {
|
for i := 0; i < bA.Bits; i++ {
|
||||||
if bA.getIndex(i) {
|
if bA.getIndex(i) {
|
||||||
bits += "X"
|
bits += "x"
|
||||||
} else {
|
} else {
|
||||||
bits += "_"
|
bits += "_"
|
||||||
}
|
}
|
||||||
|
@ -282,10 +283,10 @@ func (bA *BitArray) stringIndented(indent string) string {
|
||||||
bits = ""
|
bits = ""
|
||||||
}
|
}
|
||||||
if i%10 == 9 {
|
if i%10 == 9 {
|
||||||
bits += " "
|
bits += indent
|
||||||
}
|
}
|
||||||
if i%50 == 49 {
|
if i%50 == 49 {
|
||||||
bits += " "
|
bits += indent
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(bits) > 0 {
|
if len(bits) > 0 {
|
||||||
|
@ -320,3 +321,58 @@ func (bA *BitArray) Update(o *BitArray) {
|
||||||
|
|
||||||
copy(bA.Elems, o.Elems)
|
copy(bA.Elems, o.Elems)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MarshalJSON implements json.Marshaler interface by marshaling bit array
|
||||||
|
// using a custom format: a string of '-' or 'x' where 'x' denotes the 1 bit.
|
||||||
|
func (bA *BitArray) MarshalJSON() ([]byte, error) {
|
||||||
|
if bA == nil {
|
||||||
|
return []byte("null"), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
bA.mtx.Lock()
|
||||||
|
defer bA.mtx.Unlock()
|
||||||
|
|
||||||
|
bits := `"`
|
||||||
|
for i := 0; i < bA.Bits; i++ {
|
||||||
|
if bA.getIndex(i) {
|
||||||
|
bits += `x`
|
||||||
|
} else {
|
||||||
|
bits += `_`
|
||||||
|
}
|
||||||
|
}
|
||||||
|
bits += `"`
|
||||||
|
return []byte(bits), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var bitArrayJSONRegexp = regexp.MustCompile(`\A"([_x]*)"\z`)
|
||||||
|
|
||||||
|
// UnmarshalJSON implements json.Unmarshaler interface by unmarshaling a custom
|
||||||
|
// JSON description.
|
||||||
|
func (bA *BitArray) UnmarshalJSON(bz []byte) error {
|
||||||
|
b := string(bz)
|
||||||
|
if b == "null" {
|
||||||
|
// This is required e.g. for encoding/json when decoding
|
||||||
|
// into a pointer with pre-allocated BitArray.
|
||||||
|
bA.Bits = 0
|
||||||
|
bA.Elems = nil
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate 'b'.
|
||||||
|
match := bitArrayJSONRegexp.FindStringSubmatch(b)
|
||||||
|
if match == nil {
|
||||||
|
return fmt.Errorf("BitArray in JSON should be a string of format %q but got %s", bitArrayJSONRegexp.String(), b)
|
||||||
|
}
|
||||||
|
bits := match[1]
|
||||||
|
|
||||||
|
// Construct new BitArray and copy over.
|
||||||
|
numBits := len(bits)
|
||||||
|
bA2 := NewBitArray(numBits)
|
||||||
|
for i := 0; i < numBits; i++ {
|
||||||
|
if bits[i] == 'x' {
|
||||||
|
bA2.SetIndex(i, true)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*bA = *bA2
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -2,8 +2,10 @@ package common
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -215,3 +217,51 @@ func TestNewBitArrayNeverCrashesOnNegatives(t *testing.T) {
|
||||||
_ = NewBitArray(bits)
|
_ = NewBitArray(bits)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestJSONMarshalUnmarshal(t *testing.T) {
|
||||||
|
|
||||||
|
bA1 := NewBitArray(0)
|
||||||
|
|
||||||
|
bA2 := NewBitArray(1)
|
||||||
|
|
||||||
|
bA3 := NewBitArray(1)
|
||||||
|
bA3.SetIndex(0, true)
|
||||||
|
|
||||||
|
bA4 := NewBitArray(5)
|
||||||
|
bA4.SetIndex(0, true)
|
||||||
|
bA4.SetIndex(1, true)
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
bA *BitArray
|
||||||
|
marshalledBA string
|
||||||
|
}{
|
||||||
|
{nil, `null`},
|
||||||
|
{bA1, `null`},
|
||||||
|
{bA2, `"_"`},
|
||||||
|
{bA3, `"x"`},
|
||||||
|
{bA4, `"xx___"`},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.bA.String(), func(t *testing.T) {
|
||||||
|
bz, err := json.Marshal(tc.bA)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
assert.Equal(t, tc.marshalledBA, string(bz))
|
||||||
|
|
||||||
|
var unmarshalledBA *BitArray
|
||||||
|
err = json.Unmarshal(bz, &unmarshalledBA)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
if tc.bA == nil {
|
||||||
|
require.Nil(t, unmarshalledBA)
|
||||||
|
} else {
|
||||||
|
require.NotNil(t, unmarshalledBA)
|
||||||
|
assert.EqualValues(t, tc.bA.Bits, unmarshalledBA.Bits)
|
||||||
|
if assert.EqualValues(t, tc.bA.String(), unmarshalledBA.String()) {
|
||||||
|
assert.EqualValues(t, tc.bA.Elems, unmarshalledBA.Elems)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -45,3 +45,29 @@ func TrimmedString(b []byte) string {
|
||||||
return string(bytes.TrimLeft(b, trimSet))
|
return string(bytes.TrimLeft(b, trimSet))
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PrefixEndBytes returns the end byteslice for a noninclusive range
|
||||||
|
// that would include all byte slices for which the input is the prefix
|
||||||
|
func PrefixEndBytes(prefix []byte) []byte {
|
||||||
|
if prefix == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
end := make([]byte, len(prefix))
|
||||||
|
copy(end, prefix)
|
||||||
|
finished := false
|
||||||
|
|
||||||
|
for !finished {
|
||||||
|
if end[len(end)-1] != byte(255) {
|
||||||
|
end[len(end)-1]++
|
||||||
|
finished = true
|
||||||
|
} else {
|
||||||
|
end = end[:len(end)-1]
|
||||||
|
if len(end) == 0 {
|
||||||
|
end = nil
|
||||||
|
finished = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return end
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,28 @@
|
||||||
|
package common
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPrefixEndBytes(t *testing.T) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
|
||||||
|
var testCases = []struct {
|
||||||
|
prefix []byte
|
||||||
|
expected []byte
|
||||||
|
}{
|
||||||
|
{[]byte{byte(55), byte(255), byte(255), byte(0)}, []byte{byte(55), byte(255), byte(255), byte(1)}},
|
||||||
|
{[]byte{byte(55), byte(255), byte(255), byte(15)}, []byte{byte(55), byte(255), byte(255), byte(16)}},
|
||||||
|
{[]byte{byte(55), byte(200), byte(255)}, []byte{byte(55), byte(201)}},
|
||||||
|
{[]byte{byte(55), byte(255), byte(255)}, []byte{byte(56)}},
|
||||||
|
{[]byte{byte(255), byte(255), byte(255)}, nil},
|
||||||
|
{nil, nil},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range testCases {
|
||||||
|
end := PrefixEndBytes(test.prefix)
|
||||||
|
assert.Equal(test.expected, end)
|
||||||
|
}
|
||||||
|
}
|
|
@ -178,7 +178,7 @@ func (err *cmnError) Format(s fmt.State, verb rune) {
|
||||||
if s.Flag('#') {
|
if s.Flag('#') {
|
||||||
s.Write([]byte("--= Error =--\n"))
|
s.Write([]byte("--= Error =--\n"))
|
||||||
// Write msg.
|
// Write msg.
|
||||||
s.Write([]byte(fmt.Sprintf("Message: %#s\n", err.msg)))
|
s.Write([]byte(fmt.Sprintf("Message: %s\n", err.msg)))
|
||||||
// Write cause.
|
// Write cause.
|
||||||
s.Write([]byte(fmt.Sprintf("Cause: %#v\n", err.cause)))
|
s.Write([]byte(fmt.Sprintf("Cause: %#v\n", err.cause)))
|
||||||
// Write type.
|
// Write type.
|
||||||
|
|
|
@ -23,11 +23,11 @@ func TestWriteFileAtomic(t *testing.T) {
|
||||||
}
|
}
|
||||||
defer os.Remove(f.Name())
|
defer os.Remove(f.Name())
|
||||||
|
|
||||||
if err := ioutil.WriteFile(f.Name(), old, 0664); err != nil {
|
if err = ioutil.WriteFile(f.Name(), old, 0664); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := WriteFileAtomic(f.Name(), data, perm); err != nil {
|
if err = WriteFileAtomic(f.Name(), data, perm); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -57,3 +57,33 @@ func SplitAndTrim(s, sep, cutset string) []string {
|
||||||
}
|
}
|
||||||
return spl
|
return spl
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Returns true if s is a non-empty printable non-tab ascii character.
|
||||||
|
func IsASCIIText(s string) bool {
|
||||||
|
if len(s) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for _, b := range []byte(s) {
|
||||||
|
if 32 <= b && b <= 126 {
|
||||||
|
// good
|
||||||
|
} else {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// NOTE: Assumes that s is ASCII as per IsASCIIText(), otherwise panics.
|
||||||
|
func ASCIITrim(s string) string {
|
||||||
|
r := make([]byte, 0, len(s))
|
||||||
|
for _, b := range []byte(s) {
|
||||||
|
if b == 32 {
|
||||||
|
continue // skip space
|
||||||
|
} else if 32 < b && b <= 126 {
|
||||||
|
r = append(r, b)
|
||||||
|
} else {
|
||||||
|
panic(fmt.Sprintf("non-ASCII (non-tab) char 0x%X", b))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return string(r)
|
||||||
|
}
|
||||||
|
|
|
@ -49,3 +49,26 @@ func TestSplitAndTrim(t *testing.T) {
|
||||||
assert.Equal(t, tc.expected, SplitAndTrim(tc.s, tc.sep, tc.cutset), "%s", tc.s)
|
assert.Equal(t, tc.expected, SplitAndTrim(tc.s, tc.sep, tc.cutset), "%s", tc.s)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestIsASCIIText(t *testing.T) {
|
||||||
|
notASCIIText := []string{
|
||||||
|
"", "\xC2", "\xC2\xA2", "\xFF", "\x80", "\xF0", "\n", "\t",
|
||||||
|
}
|
||||||
|
for _, v := range notASCIIText {
|
||||||
|
assert.False(t, IsHex(v), "%q is not ascii-text", v)
|
||||||
|
}
|
||||||
|
asciiText := []string{
|
||||||
|
" ", ".", "x", "$", "_", "abcdefg;", "-", "0x00", "0", "123",
|
||||||
|
}
|
||||||
|
for _, v := range asciiText {
|
||||||
|
assert.True(t, IsASCIIText(v), "%q is ascii-text", v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestASCIITrim(t *testing.T) {
|
||||||
|
assert.Equal(t, ASCIITrim(" "), "")
|
||||||
|
assert.Equal(t, ASCIITrim(" a"), "a")
|
||||||
|
assert.Equal(t, ASCIITrim("a "), "a")
|
||||||
|
assert.Equal(t, ASCIITrim(" a "), "a")
|
||||||
|
assert.Panics(t, func() { ASCIITrim("\xC2\xA2") })
|
||||||
|
}
|
||||||
|
|
|
@ -33,6 +33,12 @@ func checkNextPanics(t *testing.T, itr Iterator) {
|
||||||
assert.Panics(t, func() { itr.Next() }, "checkNextPanics expected panic but didn't")
|
assert.Panics(t, func() { itr.Next() }, "checkNextPanics expected panic but didn't")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func checkDomain(t *testing.T, itr Iterator, start, end []byte) {
|
||||||
|
ds, de := itr.Domain()
|
||||||
|
assert.Equal(t, start, ds, "checkDomain domain start incorrect")
|
||||||
|
assert.Equal(t, end, de, "checkDomain domain end incorrect")
|
||||||
|
}
|
||||||
|
|
||||||
func checkItem(t *testing.T, itr Iterator, key []byte, value []byte) {
|
func checkItem(t *testing.T, itr Iterator, key []byte, value []byte) {
|
||||||
k, v := itr.Key(), itr.Value()
|
k, v := itr.Key(), itr.Value()
|
||||||
assert.Exactly(t, key, k)
|
assert.Exactly(t, key, k)
|
||||||
|
|
|
@ -3,8 +3,14 @@ package db
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
cmn "github.com/tendermint/tmlibs/common"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func _fmt(f string, az ...interface{}) string {
|
||||||
|
return fmt.Sprintf(f, az...)
|
||||||
|
}
|
||||||
|
|
||||||
//----------------------------------------
|
//----------------------------------------
|
||||||
// debugDB
|
// debugDB
|
||||||
|
|
||||||
|
@ -26,78 +32,84 @@ func (ddb debugDB) Mutex() *sync.Mutex { return nil }
|
||||||
|
|
||||||
// Implements DB.
|
// Implements DB.
|
||||||
func (ddb debugDB) Get(key []byte) (value []byte) {
|
func (ddb debugDB) Get(key []byte) (value []byte) {
|
||||||
defer fmt.Printf("%v.Get(%X) %X\n", ddb.label, key, value)
|
defer func() {
|
||||||
|
fmt.Printf("%v.Get(%v) %v\n", ddb.label, cmn.Cyan(_fmt("%X", key)), cmn.Blue(_fmt("%X", value)))
|
||||||
|
}()
|
||||||
value = ddb.db.Get(key)
|
value = ddb.db.Get(key)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements DB.
|
// Implements DB.
|
||||||
func (ddb debugDB) Has(key []byte) (has bool) {
|
func (ddb debugDB) Has(key []byte) (has bool) {
|
||||||
defer fmt.Printf("%v.Has(%X) %v\n", ddb.label, key, has)
|
defer func() {
|
||||||
|
fmt.Printf("%v.Has(%v) %v\n", ddb.label, cmn.Cyan(_fmt("%X", key)), has)
|
||||||
|
}()
|
||||||
return ddb.db.Has(key)
|
return ddb.db.Has(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements DB.
|
// Implements DB.
|
||||||
func (ddb debugDB) Set(key []byte, value []byte) {
|
func (ddb debugDB) Set(key []byte, value []byte) {
|
||||||
fmt.Printf("%v.Set(%X, %X)\n", ddb.label, key, value)
|
fmt.Printf("%v.Set(%v, %v)\n", ddb.label, cmn.Cyan(_fmt("%X", key)), cmn.Yellow(_fmt("%X", value)))
|
||||||
ddb.db.Set(key, value)
|
ddb.db.Set(key, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements DB.
|
// Implements DB.
|
||||||
func (ddb debugDB) SetSync(key []byte, value []byte) {
|
func (ddb debugDB) SetSync(key []byte, value []byte) {
|
||||||
fmt.Printf("%v.SetSync(%X, %X)\n", ddb.label, key, value)
|
fmt.Printf("%v.SetSync(%v, %v)\n", ddb.label, cmn.Cyan(_fmt("%X", key)), cmn.Yellow(_fmt("%X", value)))
|
||||||
ddb.db.SetSync(key, value)
|
ddb.db.SetSync(key, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements atomicSetDeleter.
|
// Implements atomicSetDeleter.
|
||||||
func (ddb debugDB) SetNoLock(key []byte, value []byte) {
|
func (ddb debugDB) SetNoLock(key []byte, value []byte) {
|
||||||
fmt.Printf("%v.SetNoLock(%X, %X)\n", ddb.label, key, value)
|
fmt.Printf("%v.SetNoLock(%v, %v)\n", ddb.label, cmn.Cyan(_fmt("%X", key)), cmn.Yellow(_fmt("%X", value)))
|
||||||
ddb.db.Set(key, value)
|
ddb.db.(atomicSetDeleter).SetNoLock(key, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements atomicSetDeleter.
|
// Implements atomicSetDeleter.
|
||||||
func (ddb debugDB) SetNoLockSync(key []byte, value []byte) {
|
func (ddb debugDB) SetNoLockSync(key []byte, value []byte) {
|
||||||
fmt.Printf("%v.SetNoLockSync(%X, %X)\n", ddb.label, key, value)
|
fmt.Printf("%v.SetNoLockSync(%v, %v)\n", ddb.label, cmn.Cyan(_fmt("%X", key)), cmn.Yellow(_fmt("%X", value)))
|
||||||
ddb.db.SetSync(key, value)
|
ddb.db.(atomicSetDeleter).SetNoLockSync(key, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements DB.
|
// Implements DB.
|
||||||
func (ddb debugDB) Delete(key []byte) {
|
func (ddb debugDB) Delete(key []byte) {
|
||||||
fmt.Printf("%v.Delete(%X)\n", ddb.label, key)
|
fmt.Printf("%v.Delete(%v)\n", ddb.label, cmn.Red(_fmt("%X", key)))
|
||||||
ddb.db.Delete(key)
|
ddb.db.Delete(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements DB.
|
// Implements DB.
|
||||||
func (ddb debugDB) DeleteSync(key []byte) {
|
func (ddb debugDB) DeleteSync(key []byte) {
|
||||||
fmt.Printf("%v.DeleteSync(%X)\n", ddb.label, key)
|
fmt.Printf("%v.DeleteSync(%v)\n", ddb.label, cmn.Red(_fmt("%X", key)))
|
||||||
ddb.db.DeleteSync(key)
|
ddb.db.DeleteSync(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements atomicSetDeleter.
|
// Implements atomicSetDeleter.
|
||||||
func (ddb debugDB) DeleteNoLock(key []byte) {
|
func (ddb debugDB) DeleteNoLock(key []byte) {
|
||||||
fmt.Printf("%v.DeleteNoLock(%X)\n", ddb.label, key)
|
fmt.Printf("%v.DeleteNoLock(%v)\n", ddb.label, cmn.Red(_fmt("%X", key)))
|
||||||
ddb.db.Delete(key)
|
ddb.db.(atomicSetDeleter).DeleteNoLock(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements atomicSetDeleter.
|
// Implements atomicSetDeleter.
|
||||||
func (ddb debugDB) DeleteNoLockSync(key []byte) {
|
func (ddb debugDB) DeleteNoLockSync(key []byte) {
|
||||||
fmt.Printf("%v.DeleteNoLockSync(%X)\n", ddb.label, key)
|
fmt.Printf("%v.DeleteNoLockSync(%v)\n", ddb.label, cmn.Red(_fmt("%X", key)))
|
||||||
ddb.db.DeleteSync(key)
|
ddb.db.(atomicSetDeleter).DeleteNoLockSync(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements DB.
|
// Implements DB.
|
||||||
func (ddb debugDB) Iterator(start, end []byte) Iterator {
|
func (ddb debugDB) Iterator(start, end []byte) Iterator {
|
||||||
fmt.Printf("%v.Iterator(%X, %X)\n", ddb.label, start, end)
|
fmt.Printf("%v.Iterator(%v, %v)\n", ddb.label, cmn.Cyan(_fmt("%X", start)), cmn.Blue(_fmt("%X", end)))
|
||||||
return NewDebugIterator(ddb.label, ddb.db.Iterator(start, end))
|
return NewDebugIterator(ddb.label, ddb.db.Iterator(start, end))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements DB.
|
// Implements DB.
|
||||||
func (ddb debugDB) ReverseIterator(start, end []byte) Iterator {
|
func (ddb debugDB) ReverseIterator(start, end []byte) Iterator {
|
||||||
fmt.Printf("%v.ReverseIterator(%X, %X)\n", ddb.label, start, end)
|
fmt.Printf("%v.ReverseIterator(%v, %v)\n", ddb.label, cmn.Cyan(_fmt("%X", start)), cmn.Blue(_fmt("%X", end)))
|
||||||
return NewDebugIterator(ddb.label, ddb.db.ReverseIterator(start, end))
|
return NewDebugIterator(ddb.label, ddb.db.ReverseIterator(start, end))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements DB.
|
// Implements DB.
|
||||||
|
// Panics if the underlying db is not an
|
||||||
|
// atomicSetDeleter.
|
||||||
func (ddb debugDB) NewBatch() Batch {
|
func (ddb debugDB) NewBatch() Batch {
|
||||||
fmt.Printf("%v.NewBatch()\n", ddb.label)
|
fmt.Printf("%v.NewBatch()\n", ddb.label)
|
||||||
return NewDebugBatch(ddb.label, ddb.db.NewBatch())
|
return NewDebugBatch(ddb.label, ddb.db.NewBatch())
|
||||||
|
@ -137,14 +149,18 @@ func NewDebugIterator(label string, itr Iterator) debugIterator {
|
||||||
|
|
||||||
// Implements Iterator.
|
// Implements Iterator.
|
||||||
func (ditr debugIterator) Domain() (start []byte, end []byte) {
|
func (ditr debugIterator) Domain() (start []byte, end []byte) {
|
||||||
defer fmt.Printf("%v.itr.Domain() (%X,%X)\n", ditr.label, start, end)
|
defer func() {
|
||||||
|
fmt.Printf("%v.itr.Domain() (%X,%X)\n", ditr.label, start, end)
|
||||||
|
}()
|
||||||
start, end = ditr.itr.Domain()
|
start, end = ditr.itr.Domain()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements Iterator.
|
// Implements Iterator.
|
||||||
func (ditr debugIterator) Valid() (ok bool) {
|
func (ditr debugIterator) Valid() (ok bool) {
|
||||||
defer fmt.Printf("%v.itr.Valid() %v\n", ditr.label, ok)
|
defer func() {
|
||||||
|
fmt.Printf("%v.itr.Valid() %v\n", ditr.label, ok)
|
||||||
|
}()
|
||||||
ok = ditr.itr.Valid()
|
ok = ditr.itr.Valid()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -157,14 +173,14 @@ func (ditr debugIterator) Next() {
|
||||||
|
|
||||||
// Implements Iterator.
|
// Implements Iterator.
|
||||||
func (ditr debugIterator) Key() (key []byte) {
|
func (ditr debugIterator) Key() (key []byte) {
|
||||||
fmt.Printf("%v.itr.Key() %X\n", ditr.label, key)
|
fmt.Printf("%v.itr.Key() %v\n", ditr.label, cmn.Cyan(_fmt("%X", key)))
|
||||||
key = ditr.itr.Key()
|
key = ditr.itr.Key()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements Iterator.
|
// Implements Iterator.
|
||||||
func (ditr debugIterator) Value() (value []byte) {
|
func (ditr debugIterator) Value() (value []byte) {
|
||||||
fmt.Printf("%v.itr.Value() %X\n", ditr.label, value)
|
fmt.Printf("%v.itr.Value() %v\n", ditr.label, cmn.Blue(_fmt("%X", value)))
|
||||||
value = ditr.itr.Value()
|
value = ditr.itr.Value()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -193,13 +209,13 @@ func NewDebugBatch(label string, bch Batch) debugBatch {
|
||||||
|
|
||||||
// Implements Batch.
|
// Implements Batch.
|
||||||
func (dbch debugBatch) Set(key, value []byte) {
|
func (dbch debugBatch) Set(key, value []byte) {
|
||||||
fmt.Printf("%v.batch.Set(%X, %X)\n", dbch.label, key, value)
|
fmt.Printf("%v.batch.Set(%v, %v)\n", dbch.label, cmn.Cyan(_fmt("%X", key)), cmn.Yellow(_fmt("%X", value)))
|
||||||
dbch.bch.Set(key, value)
|
dbch.bch.Set(key, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements Batch.
|
// Implements Batch.
|
||||||
func (dbch debugBatch) Delete(key []byte) {
|
func (dbch debugBatch) Delete(key []byte) {
|
||||||
fmt.Printf("%v.batch.Delete(%X)\n", dbch.label, key)
|
fmt.Printf("%v.batch.Delete(%v)\n", dbch.label, cmn.Red(_fmt("%X", key)))
|
||||||
dbch.bch.Delete(key)
|
dbch.bch.Delete(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
package db
|
package db
|
||||||
|
|
||||||
import "sync"
|
import (
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
type atomicSetDeleter interface {
|
type atomicSetDeleter interface {
|
||||||
Mutex() *sync.Mutex
|
Mutex() *sync.Mutex
|
||||||
|
@ -66,6 +68,5 @@ func (mBatch *memBatch) write(doSync bool) {
|
||||||
case opTypeDelete:
|
case opTypeDelete:
|
||||||
mBatch.db.DeleteNoLock(op.key)
|
mBatch.db.DeleteNoLock(op.key)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
10
db/mem_db.go
10
db/mem_db.go
|
@ -37,7 +37,8 @@ func (db *MemDB) Get(key []byte) []byte {
|
||||||
defer db.mtx.Unlock()
|
defer db.mtx.Unlock()
|
||||||
key = nonNilBytes(key)
|
key = nonNilBytes(key)
|
||||||
|
|
||||||
return db.db[string(key)]
|
value := db.db[string(key)]
|
||||||
|
return value
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements DB.
|
// Implements DB.
|
||||||
|
@ -162,7 +163,7 @@ func (db *MemDB) ReverseIterator(start, end []byte) Iterator {
|
||||||
db.mtx.Lock()
|
db.mtx.Lock()
|
||||||
defer db.mtx.Unlock()
|
defer db.mtx.Unlock()
|
||||||
|
|
||||||
keys := db.getSortedKeys(end, start, true)
|
keys := db.getSortedKeys(start, end, true)
|
||||||
return newMemDBIterator(db, keys, start, end)
|
return newMemDBIterator(db, keys, start, end)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -236,7 +237,8 @@ func (itr *memDBIterator) assertIsValid() {
|
||||||
func (db *MemDB) getSortedKeys(start, end []byte, reverse bool) []string {
|
func (db *MemDB) getSortedKeys(start, end []byte, reverse bool) []string {
|
||||||
keys := []string{}
|
keys := []string{}
|
||||||
for key := range db.db {
|
for key := range db.db {
|
||||||
if IsKeyInDomain([]byte(key), start, end, false) {
|
inDomain := IsKeyInDomain([]byte(key), start, end, reverse)
|
||||||
|
if inDomain {
|
||||||
keys = append(keys, key)
|
keys = append(keys, key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -244,7 +246,9 @@ func (db *MemDB) getSortedKeys(start, end []byte, reverse bool) []string {
|
||||||
if reverse {
|
if reverse {
|
||||||
nkeys := len(keys)
|
nkeys := len(keys)
|
||||||
for i := 0; i < nkeys/2; i++ {
|
for i := 0; i < nkeys/2; i++ {
|
||||||
|
temp := keys[i]
|
||||||
keys[i] = keys[nkeys-i-1]
|
keys[i] = keys[nkeys-i-1]
|
||||||
|
keys[nkeys-i-1] = temp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return keys
|
return keys
|
||||||
|
|
214
db/prefix_db.go
214
db/prefix_db.go
|
@ -24,7 +24,8 @@ func IteratePrefix(db DB, prefix []byte) Iterator {
|
||||||
TODO: Make test, maybe rename.
|
TODO: Make test, maybe rename.
|
||||||
// Like IteratePrefix but the iterator strips the prefix from the keys.
|
// Like IteratePrefix but the iterator strips the prefix from the keys.
|
||||||
func IteratePrefixStripped(db DB, prefix []byte) Iterator {
|
func IteratePrefixStripped(db DB, prefix []byte) Iterator {
|
||||||
return newUnprefixIterator(prefix, IteratePrefix(db, prefix))
|
start, end := ...
|
||||||
|
return newPrefixIterator(prefix, start, end, IteratePrefix(db, prefix))
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
@ -55,7 +56,9 @@ func (pdb *prefixDB) Get(key []byte) []byte {
|
||||||
pdb.mtx.Lock()
|
pdb.mtx.Lock()
|
||||||
defer pdb.mtx.Unlock()
|
defer pdb.mtx.Unlock()
|
||||||
|
|
||||||
return pdb.db.Get(pdb.prefixed(key))
|
pkey := pdb.prefixed(key)
|
||||||
|
value := pdb.db.Get(pkey)
|
||||||
|
return value
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements DB.
|
// Implements DB.
|
||||||
|
@ -71,7 +74,8 @@ func (pdb *prefixDB) Set(key []byte, value []byte) {
|
||||||
pdb.mtx.Lock()
|
pdb.mtx.Lock()
|
||||||
defer pdb.mtx.Unlock()
|
defer pdb.mtx.Unlock()
|
||||||
|
|
||||||
pdb.db.Set(pdb.prefixed(key), value)
|
pkey := pdb.prefixed(key)
|
||||||
|
pdb.db.Set(pkey, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements DB.
|
// Implements DB.
|
||||||
|
@ -82,16 +86,6 @@ func (pdb *prefixDB) SetSync(key []byte, value []byte) {
|
||||||
pdb.db.SetSync(pdb.prefixed(key), value)
|
pdb.db.SetSync(pdb.prefixed(key), value)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements atomicSetDeleter.
|
|
||||||
func (pdb *prefixDB) SetNoLock(key []byte, value []byte) {
|
|
||||||
pdb.db.Set(pdb.prefixed(key), value)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Implements atomicSetDeleter.
|
|
||||||
func (pdb *prefixDB) SetNoLockSync(key []byte, value []byte) {
|
|
||||||
pdb.db.SetSync(pdb.prefixed(key), value)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Implements DB.
|
// Implements DB.
|
||||||
func (pdb *prefixDB) Delete(key []byte) {
|
func (pdb *prefixDB) Delete(key []byte) {
|
||||||
pdb.mtx.Lock()
|
pdb.mtx.Lock()
|
||||||
|
@ -108,28 +102,22 @@ func (pdb *prefixDB) DeleteSync(key []byte) {
|
||||||
pdb.db.DeleteSync(pdb.prefixed(key))
|
pdb.db.DeleteSync(pdb.prefixed(key))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements atomicSetDeleter.
|
|
||||||
func (pdb *prefixDB) DeleteNoLock(key []byte) {
|
|
||||||
pdb.db.Delete(pdb.prefixed(key))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Implements atomicSetDeleter.
|
|
||||||
func (pdb *prefixDB) DeleteNoLockSync(key []byte) {
|
|
||||||
pdb.db.DeleteSync(pdb.prefixed(key))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Implements DB.
|
// Implements DB.
|
||||||
func (pdb *prefixDB) Iterator(start, end []byte) Iterator {
|
func (pdb *prefixDB) Iterator(start, end []byte) Iterator {
|
||||||
pdb.mtx.Lock()
|
pdb.mtx.Lock()
|
||||||
defer pdb.mtx.Unlock()
|
defer pdb.mtx.Unlock()
|
||||||
|
|
||||||
pstart := append(pdb.prefix, start...)
|
var pstart, pend []byte
|
||||||
pend := []byte(nil)
|
pstart = append(cp(pdb.prefix), start...)
|
||||||
if end != nil {
|
if end == nil {
|
||||||
pend = append(pdb.prefix, end...)
|
pend = cpIncr(pdb.prefix)
|
||||||
|
} else {
|
||||||
|
pend = append(cp(pdb.prefix), end...)
|
||||||
}
|
}
|
||||||
return newUnprefixIterator(
|
return newPrefixIterator(
|
||||||
pdb.prefix,
|
pdb.prefix,
|
||||||
|
start,
|
||||||
|
end,
|
||||||
pdb.db.Iterator(
|
pdb.db.Iterator(
|
||||||
pstart,
|
pstart,
|
||||||
pend,
|
pend,
|
||||||
|
@ -142,31 +130,68 @@ func (pdb *prefixDB) ReverseIterator(start, end []byte) Iterator {
|
||||||
pdb.mtx.Lock()
|
pdb.mtx.Lock()
|
||||||
defer pdb.mtx.Unlock()
|
defer pdb.mtx.Unlock()
|
||||||
|
|
||||||
pstart := []byte(nil)
|
var pstart, pend []byte
|
||||||
if start != nil {
|
if start == nil {
|
||||||
pstart = append(pdb.prefix, start...)
|
// This may cause the underlying iterator to start with
|
||||||
|
// an item which doesn't start with prefix. We will skip
|
||||||
|
// that item later in this function. See 'skipOne'.
|
||||||
|
pstart = cpIncr(pdb.prefix)
|
||||||
|
} else {
|
||||||
|
pstart = append(cp(pdb.prefix), start...)
|
||||||
}
|
}
|
||||||
pend := []byte(nil)
|
if end == nil {
|
||||||
if end != nil {
|
// This may cause the underlying iterator to end with an
|
||||||
pend = append(pdb.prefix, end...)
|
// item which doesn't start with prefix. The
|
||||||
|
// prefixIterator will terminate iteration
|
||||||
|
// automatically upon detecting this.
|
||||||
|
pend = cpDecr(pdb.prefix)
|
||||||
|
} else {
|
||||||
|
pend = append(cp(pdb.prefix), end...)
|
||||||
}
|
}
|
||||||
return newUnprefixIterator(
|
ritr := pdb.db.ReverseIterator(pstart, pend)
|
||||||
|
if start == nil {
|
||||||
|
skipOne(ritr, cpIncr(pdb.prefix))
|
||||||
|
}
|
||||||
|
return newPrefixIterator(
|
||||||
pdb.prefix,
|
pdb.prefix,
|
||||||
pdb.db.ReverseIterator(
|
start,
|
||||||
pstart,
|
end,
|
||||||
pend,
|
ritr,
|
||||||
),
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements DB.
|
// Implements DB.
|
||||||
|
// Panics if the underlying DB is not an
|
||||||
|
// atomicSetDeleter.
|
||||||
func (pdb *prefixDB) NewBatch() Batch {
|
func (pdb *prefixDB) NewBatch() Batch {
|
||||||
pdb.mtx.Lock()
|
pdb.mtx.Lock()
|
||||||
defer pdb.mtx.Unlock()
|
defer pdb.mtx.Unlock()
|
||||||
|
|
||||||
return &memBatch{pdb, nil}
|
return newPrefixBatch(pdb.prefix, pdb.db.NewBatch())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* NOTE: Uncomment to use memBatch instead of prefixBatch
|
||||||
|
// Implements atomicSetDeleter.
|
||||||
|
func (pdb *prefixDB) SetNoLock(key []byte, value []byte) {
|
||||||
|
pdb.db.(atomicSetDeleter).SetNoLock(pdb.prefixed(key), value)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implements atomicSetDeleter.
|
||||||
|
func (pdb *prefixDB) SetNoLockSync(key []byte, value []byte) {
|
||||||
|
pdb.db.(atomicSetDeleter).SetNoLockSync(pdb.prefixed(key), value)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implements atomicSetDeleter.
|
||||||
|
func (pdb *prefixDB) DeleteNoLock(key []byte) {
|
||||||
|
pdb.db.(atomicSetDeleter).DeleteNoLock(pdb.prefixed(key))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implements atomicSetDeleter.
|
||||||
|
func (pdb *prefixDB) DeleteNoLockSync(key []byte) {
|
||||||
|
pdb.db.(atomicSetDeleter).DeleteNoLockSync(pdb.prefixed(key))
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
// Implements DB.
|
// Implements DB.
|
||||||
func (pdb *prefixDB) Close() {
|
func (pdb *prefixDB) Close() {
|
||||||
pdb.mtx.Lock()
|
pdb.mtx.Lock()
|
||||||
|
@ -201,52 +226,109 @@ func (pdb *prefixDB) Stats() map[string]string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pdb *prefixDB) prefixed(key []byte) []byte {
|
func (pdb *prefixDB) prefixed(key []byte) []byte {
|
||||||
return append(pdb.prefix, key...)
|
return append(cp(pdb.prefix), key...)
|
||||||
}
|
}
|
||||||
|
|
||||||
//----------------------------------------
|
//----------------------------------------
|
||||||
|
// prefixBatch
|
||||||
|
|
||||||
// Strips prefix while iterating from Iterator.
|
type prefixBatch struct {
|
||||||
type unprefixIterator struct {
|
|
||||||
prefix []byte
|
prefix []byte
|
||||||
source Iterator
|
source Batch
|
||||||
}
|
}
|
||||||
|
|
||||||
func newUnprefixIterator(prefix []byte, source Iterator) unprefixIterator {
|
func newPrefixBatch(prefix []byte, source Batch) prefixBatch {
|
||||||
return unprefixIterator{
|
return prefixBatch{
|
||||||
prefix: prefix,
|
prefix: prefix,
|
||||||
source: source,
|
source: source,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (itr unprefixIterator) Domain() (start []byte, end []byte) {
|
func (pb prefixBatch) Set(key, value []byte) {
|
||||||
start, end = itr.source.Domain()
|
pkey := append(cp(pb.prefix), key...)
|
||||||
if len(start) > 0 {
|
pb.source.Set(pkey, value)
|
||||||
start = stripPrefix(start, itr.prefix)
|
|
||||||
}
|
}
|
||||||
if len(end) > 0 {
|
|
||||||
end = stripPrefix(end, itr.prefix)
|
func (pb prefixBatch) Delete(key []byte) {
|
||||||
|
pkey := append(cp(pb.prefix), key...)
|
||||||
|
pb.source.Delete(pkey)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (pb prefixBatch) Write() {
|
||||||
|
pb.source.Write()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pb prefixBatch) WriteSync() {
|
||||||
|
pb.source.WriteSync()
|
||||||
|
}
|
||||||
|
|
||||||
|
//----------------------------------------
|
||||||
|
// prefixIterator
|
||||||
|
|
||||||
|
// Strips prefix while iterating from Iterator.
|
||||||
|
type prefixIterator struct {
|
||||||
|
prefix []byte
|
||||||
|
start []byte
|
||||||
|
end []byte
|
||||||
|
source Iterator
|
||||||
|
valid bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPrefixIterator(prefix, start, end []byte, source Iterator) prefixIterator {
|
||||||
|
if !source.Valid() || !bytes.HasPrefix(source.Key(), prefix) {
|
||||||
|
return prefixIterator{
|
||||||
|
prefix: prefix,
|
||||||
|
start: start,
|
||||||
|
end: end,
|
||||||
|
source: source,
|
||||||
|
valid: false,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return prefixIterator{
|
||||||
|
prefix: prefix,
|
||||||
|
start: start,
|
||||||
|
end: end,
|
||||||
|
source: source,
|
||||||
|
valid: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (itr prefixIterator) Domain() (start []byte, end []byte) {
|
||||||
|
return itr.start, itr.end
|
||||||
|
}
|
||||||
|
|
||||||
|
func (itr prefixIterator) Valid() bool {
|
||||||
|
return itr.valid && itr.source.Valid()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (itr prefixIterator) Next() {
|
||||||
|
if !itr.valid {
|
||||||
|
panic("prefixIterator invalid, cannot call Next()")
|
||||||
|
}
|
||||||
|
itr.source.Next()
|
||||||
|
if !itr.source.Valid() || !bytes.HasPrefix(itr.source.Key(), itr.prefix) {
|
||||||
|
itr.source.Close()
|
||||||
|
itr.valid = false
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (itr unprefixIterator) Valid() bool {
|
|
||||||
return itr.source.Valid()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (itr unprefixIterator) Next() {
|
func (itr prefixIterator) Key() (key []byte) {
|
||||||
itr.source.Next()
|
if !itr.valid {
|
||||||
|
panic("prefixIterator invalid, cannot call Key()")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (itr unprefixIterator) Key() (key []byte) {
|
|
||||||
return stripPrefix(itr.source.Key(), itr.prefix)
|
return stripPrefix(itr.source.Key(), itr.prefix)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (itr unprefixIterator) Value() (value []byte) {
|
func (itr prefixIterator) Value() (value []byte) {
|
||||||
|
if !itr.valid {
|
||||||
|
panic("prefixIterator invalid, cannot call Value()")
|
||||||
|
}
|
||||||
return itr.source.Value()
|
return itr.source.Value()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (itr unprefixIterator) Close() {
|
func (itr prefixIterator) Close() {
|
||||||
itr.source.Close()
|
itr.source.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -261,3 +343,13 @@ func stripPrefix(key []byte, prefix []byte) (stripped []byte) {
|
||||||
}
|
}
|
||||||
return key[len(prefix):]
|
return key[len(prefix):]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If the first iterator item is skipKey, then
|
||||||
|
// skip it.
|
||||||
|
func skipOne(itr Iterator, skipKey []byte) {
|
||||||
|
if itr.Valid() {
|
||||||
|
if bytes.Equal(itr.Key(), skipKey) {
|
||||||
|
itr.Next()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -2,7 +2,7 @@ package db
|
||||||
|
|
||||||
import "testing"
|
import "testing"
|
||||||
|
|
||||||
func TestIteratePrefix(t *testing.T) {
|
func mockDBWithStuff() DB {
|
||||||
db := NewMemDB()
|
db := NewMemDB()
|
||||||
// Under "key" prefix
|
// Under "key" prefix
|
||||||
db.Set(bz("key"), bz("value"))
|
db.Set(bz("key"), bz("value"))
|
||||||
|
@ -14,10 +14,13 @@ func TestIteratePrefix(t *testing.T) {
|
||||||
db.Set(bz("k"), bz("val"))
|
db.Set(bz("k"), bz("val"))
|
||||||
db.Set(bz("ke"), bz("valu"))
|
db.Set(bz("ke"), bz("valu"))
|
||||||
db.Set(bz("kee"), bz("valuu"))
|
db.Set(bz("kee"), bz("valuu"))
|
||||||
xitr := db.Iterator(nil, nil)
|
return db
|
||||||
xitr.Key()
|
}
|
||||||
|
|
||||||
|
func TestPrefixDBSimple(t *testing.T) {
|
||||||
|
db := mockDBWithStuff()
|
||||||
pdb := NewPrefixDB(db, bz("key"))
|
pdb := NewPrefixDB(db, bz("key"))
|
||||||
|
|
||||||
checkValue(t, pdb, bz("key"), nil)
|
checkValue(t, pdb, bz("key"), nil)
|
||||||
checkValue(t, pdb, bz(""), bz("value"))
|
checkValue(t, pdb, bz(""), bz("value"))
|
||||||
checkValue(t, pdb, bz("key1"), nil)
|
checkValue(t, pdb, bz("key1"), nil)
|
||||||
|
@ -30,9 +33,14 @@ func TestIteratePrefix(t *testing.T) {
|
||||||
checkValue(t, pdb, bz("k"), nil)
|
checkValue(t, pdb, bz("k"), nil)
|
||||||
checkValue(t, pdb, bz("ke"), nil)
|
checkValue(t, pdb, bz("ke"), nil)
|
||||||
checkValue(t, pdb, bz("kee"), nil)
|
checkValue(t, pdb, bz("kee"), nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPrefixDBIterator1(t *testing.T) {
|
||||||
|
db := mockDBWithStuff()
|
||||||
|
pdb := NewPrefixDB(db, bz("key"))
|
||||||
|
|
||||||
itr := pdb.Iterator(nil, nil)
|
itr := pdb.Iterator(nil, nil)
|
||||||
itr.Key()
|
checkDomain(t, itr, nil, nil)
|
||||||
checkItem(t, itr, bz(""), bz("value"))
|
checkItem(t, itr, bz(""), bz("value"))
|
||||||
checkNext(t, itr, true)
|
checkNext(t, itr, true)
|
||||||
checkItem(t, itr, bz("1"), bz("value1"))
|
checkItem(t, itr, bz("1"), bz("value1"))
|
||||||
|
@ -40,5 +48,100 @@ func TestIteratePrefix(t *testing.T) {
|
||||||
checkItem(t, itr, bz("2"), bz("value2"))
|
checkItem(t, itr, bz("2"), bz("value2"))
|
||||||
checkNext(t, itr, true)
|
checkNext(t, itr, true)
|
||||||
checkItem(t, itr, bz("3"), bz("value3"))
|
checkItem(t, itr, bz("3"), bz("value3"))
|
||||||
|
checkNext(t, itr, false)
|
||||||
|
checkInvalid(t, itr)
|
||||||
|
itr.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPrefixDBIterator2(t *testing.T) {
|
||||||
|
db := mockDBWithStuff()
|
||||||
|
pdb := NewPrefixDB(db, bz("key"))
|
||||||
|
|
||||||
|
itr := pdb.Iterator(nil, bz(""))
|
||||||
|
checkDomain(t, itr, nil, bz(""))
|
||||||
|
checkInvalid(t, itr)
|
||||||
|
itr.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPrefixDBIterator3(t *testing.T) {
|
||||||
|
db := mockDBWithStuff()
|
||||||
|
pdb := NewPrefixDB(db, bz("key"))
|
||||||
|
|
||||||
|
itr := pdb.Iterator(bz(""), nil)
|
||||||
|
checkDomain(t, itr, bz(""), nil)
|
||||||
|
checkItem(t, itr, bz(""), bz("value"))
|
||||||
|
checkNext(t, itr, true)
|
||||||
|
checkItem(t, itr, bz("1"), bz("value1"))
|
||||||
|
checkNext(t, itr, true)
|
||||||
|
checkItem(t, itr, bz("2"), bz("value2"))
|
||||||
|
checkNext(t, itr, true)
|
||||||
|
checkItem(t, itr, bz("3"), bz("value3"))
|
||||||
|
checkNext(t, itr, false)
|
||||||
|
checkInvalid(t, itr)
|
||||||
|
itr.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPrefixDBIterator4(t *testing.T) {
|
||||||
|
db := mockDBWithStuff()
|
||||||
|
pdb := NewPrefixDB(db, bz("key"))
|
||||||
|
|
||||||
|
itr := pdb.Iterator(bz(""), bz(""))
|
||||||
|
checkDomain(t, itr, bz(""), bz(""))
|
||||||
|
checkInvalid(t, itr)
|
||||||
|
itr.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPrefixDBReverseIterator1(t *testing.T) {
|
||||||
|
db := mockDBWithStuff()
|
||||||
|
pdb := NewPrefixDB(db, bz("key"))
|
||||||
|
|
||||||
|
itr := pdb.ReverseIterator(nil, nil)
|
||||||
|
checkDomain(t, itr, nil, nil)
|
||||||
|
checkItem(t, itr, bz("3"), bz("value3"))
|
||||||
|
checkNext(t, itr, true)
|
||||||
|
checkItem(t, itr, bz("2"), bz("value2"))
|
||||||
|
checkNext(t, itr, true)
|
||||||
|
checkItem(t, itr, bz("1"), bz("value1"))
|
||||||
|
checkNext(t, itr, true)
|
||||||
|
checkItem(t, itr, bz(""), bz("value"))
|
||||||
|
checkNext(t, itr, false)
|
||||||
|
checkInvalid(t, itr)
|
||||||
|
itr.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPrefixDBReverseIterator2(t *testing.T) {
|
||||||
|
db := mockDBWithStuff()
|
||||||
|
pdb := NewPrefixDB(db, bz("key"))
|
||||||
|
|
||||||
|
itr := pdb.ReverseIterator(nil, bz(""))
|
||||||
|
checkDomain(t, itr, nil, bz(""))
|
||||||
|
checkItem(t, itr, bz("3"), bz("value3"))
|
||||||
|
checkNext(t, itr, true)
|
||||||
|
checkItem(t, itr, bz("2"), bz("value2"))
|
||||||
|
checkNext(t, itr, true)
|
||||||
|
checkItem(t, itr, bz("1"), bz("value1"))
|
||||||
|
checkNext(t, itr, false)
|
||||||
|
checkInvalid(t, itr)
|
||||||
|
itr.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPrefixDBReverseIterator3(t *testing.T) {
|
||||||
|
db := mockDBWithStuff()
|
||||||
|
pdb := NewPrefixDB(db, bz("key"))
|
||||||
|
|
||||||
|
itr := pdb.ReverseIterator(bz(""), nil)
|
||||||
|
checkDomain(t, itr, bz(""), nil)
|
||||||
|
checkItem(t, itr, bz(""), bz("value"))
|
||||||
|
checkNext(t, itr, false)
|
||||||
|
checkInvalid(t, itr)
|
||||||
|
itr.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPrefixDBReverseIterator4(t *testing.T) {
|
||||||
|
db := mockDBWithStuff()
|
||||||
|
pdb := NewPrefixDB(db, bz("key"))
|
||||||
|
|
||||||
|
itr := pdb.ReverseIterator(bz(""), bz(""))
|
||||||
|
checkInvalid(t, itr)
|
||||||
itr.Close()
|
itr.Close()
|
||||||
}
|
}
|
||||||
|
|
26
db/util.go
26
db/util.go
|
@ -33,6 +33,29 @@ func cpIncr(bz []byte) (ret []byte) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Returns a slice of the same length (big endian)
|
||||||
|
// except decremented by one.
|
||||||
|
// Returns nil on underflow (e.g. if bz bytes are all 0x00)
|
||||||
|
// CONTRACT: len(bz) > 0
|
||||||
|
func cpDecr(bz []byte) (ret []byte) {
|
||||||
|
if len(bz) == 0 {
|
||||||
|
panic("cpDecr expects non-zero bz length")
|
||||||
|
}
|
||||||
|
ret = cp(bz)
|
||||||
|
for i := len(bz) - 1; i >= 0; i-- {
|
||||||
|
if ret[i] > byte(0x00) {
|
||||||
|
ret[i]--
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ret[i] = byte(0xFF)
|
||||||
|
if i == 0 {
|
||||||
|
// Underflow
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// See DB interface documentation for more information.
|
// See DB interface documentation for more information.
|
||||||
func IsKeyInDomain(key, start, end []byte, isReverse bool) bool {
|
func IsKeyInDomain(key, start, end []byte, isReverse bool) bool {
|
||||||
if !isReverse {
|
if !isReverse {
|
||||||
|
@ -43,7 +66,7 @@ func IsKeyInDomain(key, start, end []byte, isReverse bool) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
} else {
|
||||||
if start != nil && bytes.Compare(start, key) < 0 {
|
if start != nil && bytes.Compare(start, key) < 0 {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -52,3 +75,4 @@ func IsKeyInDomain(key, start, end []byte, isReverse bool) bool {
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -60,9 +60,9 @@ func (sm *SimpleMap) KVPairs() cmn.KVPairs {
|
||||||
//----------------------------------------
|
//----------------------------------------
|
||||||
|
|
||||||
// A local extension to KVPair that can be hashed.
|
// A local extension to KVPair that can be hashed.
|
||||||
type kvPair cmn.KVPair
|
type KVPair cmn.KVPair
|
||||||
|
|
||||||
func (kv kvPair) Hash() []byte {
|
func (kv KVPair) Hash() []byte {
|
||||||
hasher := ripemd160.New()
|
hasher := ripemd160.New()
|
||||||
err := encodeByteSlice(hasher, kv.Key)
|
err := encodeByteSlice(hasher, kv.Key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -78,7 +78,7 @@ func (kv kvPair) Hash() []byte {
|
||||||
func hashKVPairs(kvs cmn.KVPairs) []byte {
|
func hashKVPairs(kvs cmn.KVPairs) []byte {
|
||||||
kvsH := make([]Hasher, 0, len(kvs))
|
kvsH := make([]Hasher, 0, len(kvs))
|
||||||
for _, kvp := range kvs {
|
for _, kvp := range kvs {
|
||||||
kvsH = append(kvsH, kvPair(kvp))
|
kvsH = append(kvsH, KVPair(kvp))
|
||||||
}
|
}
|
||||||
return SimpleHashFromHashers(kvsH)
|
return SimpleHashFromHashers(kvsH)
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,20 @@ func SimpleProofsFromHashers(items []Hasher) (rootHash []byte, proofs []*SimpleP
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func SimpleProofsFromMap(m map[string]Hasher) (rootHash []byte, proofs []*SimpleProof) {
|
||||||
|
sm := NewSimpleMap()
|
||||||
|
for k, v := range m {
|
||||||
|
sm.Set(k, v)
|
||||||
|
}
|
||||||
|
sm.Sort()
|
||||||
|
kvs := sm.kvs
|
||||||
|
kvsH := make([]Hasher, 0, len(kvs))
|
||||||
|
for _, kvp := range kvs {
|
||||||
|
kvsH = append(kvsH, KVPair(kvp))
|
||||||
|
}
|
||||||
|
return SimpleProofsFromHashers(kvsH)
|
||||||
|
}
|
||||||
|
|
||||||
// Verify that leafHash is a leaf hash of the simple-merkle-tree
|
// Verify that leafHash is a leaf hash of the simple-merkle-tree
|
||||||
// which hashes to rootHash.
|
// which hashes to rootHash.
|
||||||
func (sp *SimpleProof) Verify(index int, total int, leafHash []byte, rootHash []byte) bool {
|
func (sp *SimpleProof) Verify(index int, total int, leafHash []byte, rootHash []byte) bool {
|
||||||
|
|
|
@ -3,7 +3,7 @@ package merkle
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
|
||||||
. "github.com/tendermint/tmlibs/common"
|
cmn "github.com/tendermint/tmlibs/common"
|
||||||
. "github.com/tendermint/tmlibs/test"
|
. "github.com/tendermint/tmlibs/test"
|
||||||
|
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -21,7 +21,7 @@ func TestSimpleProof(t *testing.T) {
|
||||||
|
|
||||||
items := make([]Hasher, total)
|
items := make([]Hasher, total)
|
||||||
for i := 0; i < total; i++ {
|
for i := 0; i < total; i++ {
|
||||||
items[i] = testItem(RandBytes(32))
|
items[i] = testItem(cmn.RandBytes(32))
|
||||||
}
|
}
|
||||||
|
|
||||||
rootHash := SimpleHashFromHashers(items)
|
rootHash := SimpleHashFromHashers(items)
|
||||||
|
@ -53,7 +53,7 @@ func TestSimpleProof(t *testing.T) {
|
||||||
|
|
||||||
// Trail too long should make it fail
|
// Trail too long should make it fail
|
||||||
origAunts := proof.Aunts
|
origAunts := proof.Aunts
|
||||||
proof.Aunts = append(proof.Aunts, RandBytes(32))
|
proof.Aunts = append(proof.Aunts, cmn.RandBytes(32))
|
||||||
{
|
{
|
||||||
ok = proof.Verify(i, total, itemHash, rootHash)
|
ok = proof.Verify(i, total, itemHash, rootHash)
|
||||||
if ok {
|
if ok {
|
||||||
|
|
|
@ -21,7 +21,7 @@ func TestExample(t *testing.T) {
|
||||||
ch := make(chan interface{}, 1)
|
ch := make(chan interface{}, 1)
|
||||||
err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"), ch)
|
err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"), ch)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
err = s.PublishWithTags(ctx, "Tombstone", map[string]interface{}{"abci.account.name": "John"})
|
err = s.PublishWithTags(ctx, "Tombstone", pubsub.NewTagMap(map[string]interface{}{"abci.account.name": "John"}))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assertReceive(t, "Tombstone", ch)
|
assertReceive(t, "Tombstone", ch)
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,18 +38,30 @@ var (
|
||||||
ErrAlreadySubscribed = errors.New("already subscribed")
|
ErrAlreadySubscribed = errors.New("already subscribed")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// TagMap is used to associate tags to a message.
|
||||||
|
// They can be queried by subscribers to choose messages they will received.
|
||||||
|
type TagMap interface {
|
||||||
|
// Get returns the value for a key, or nil if no value is present.
|
||||||
|
// The ok result indicates whether value was found in the tags.
|
||||||
|
Get(key string) (value interface{}, ok bool)
|
||||||
|
// Len returns the number of tags.
|
||||||
|
Len() int
|
||||||
|
}
|
||||||
|
|
||||||
|
type tagMap map[string]interface{}
|
||||||
|
|
||||||
type cmd struct {
|
type cmd struct {
|
||||||
op operation
|
op operation
|
||||||
query Query
|
query Query
|
||||||
ch chan<- interface{}
|
ch chan<- interface{}
|
||||||
clientID string
|
clientID string
|
||||||
msg interface{}
|
msg interface{}
|
||||||
tags map[string]interface{}
|
tags TagMap
|
||||||
}
|
}
|
||||||
|
|
||||||
// Query defines an interface for a query to be used for subscribing.
|
// Query defines an interface for a query to be used for subscribing.
|
||||||
type Query interface {
|
type Query interface {
|
||||||
Matches(tags map[string]interface{}) bool
|
Matches(tags TagMap) bool
|
||||||
String() string
|
String() string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -68,6 +80,23 @@ type Server struct {
|
||||||
// Option sets a parameter for the server.
|
// Option sets a parameter for the server.
|
||||||
type Option func(*Server)
|
type Option func(*Server)
|
||||||
|
|
||||||
|
// NewTagMap constructs a new immutable tag set from a map.
|
||||||
|
func NewTagMap(data map[string]interface{}) TagMap {
|
||||||
|
return tagMap(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get returns the value for a key, or nil if no value is present.
|
||||||
|
// The ok result indicates whether value was found in the tags.
|
||||||
|
func (ts tagMap) Get(key string) (value interface{}, ok bool) {
|
||||||
|
value, ok = ts[key]
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Len returns the number of tags.
|
||||||
|
func (ts tagMap) Len() int {
|
||||||
|
return len(ts)
|
||||||
|
}
|
||||||
|
|
||||||
// NewServer returns a new server. See the commentary on the Option functions
|
// NewServer returns a new server. See the commentary on the Option functions
|
||||||
// for a detailed description of how to configure buffering. If no options are
|
// for a detailed description of how to configure buffering. If no options are
|
||||||
// provided, the resulting server's queue is unbuffered.
|
// provided, the resulting server's queue is unbuffered.
|
||||||
|
@ -184,13 +213,13 @@ func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
|
||||||
// Publish publishes the given message. An error will be returned to the caller
|
// Publish publishes the given message. An error will be returned to the caller
|
||||||
// if the context is canceled.
|
// if the context is canceled.
|
||||||
func (s *Server) Publish(ctx context.Context, msg interface{}) error {
|
func (s *Server) Publish(ctx context.Context, msg interface{}) error {
|
||||||
return s.PublishWithTags(ctx, msg, make(map[string]interface{}))
|
return s.PublishWithTags(ctx, msg, NewTagMap(make(map[string]interface{})))
|
||||||
}
|
}
|
||||||
|
|
||||||
// PublishWithTags publishes the given message with the set of tags. The set is
|
// PublishWithTags publishes the given message with the set of tags. The set is
|
||||||
// matched with clients queries. If there is a match, the message is sent to
|
// matched with clients queries. If there is a match, the message is sent to
|
||||||
// the client.
|
// the client.
|
||||||
func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags map[string]interface{}) error {
|
func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags TagMap) error {
|
||||||
select {
|
select {
|
||||||
case s.cmds <- cmd{op: pub, msg: msg, tags: tags}:
|
case s.cmds <- cmd{op: pub, msg: msg, tags: tags}:
|
||||||
return nil
|
return nil
|
||||||
|
@ -302,7 +331,7 @@ func (state *state) removeAll(clientID string) {
|
||||||
delete(state.clients, clientID)
|
delete(state.clients, clientID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (state *state) send(msg interface{}, tags map[string]interface{}) {
|
func (state *state) send(msg interface{}, tags TagMap) {
|
||||||
for q, clientToChannelMap := range state.queries {
|
for q, clientToChannelMap := range state.queries {
|
||||||
if q.Matches(tags) {
|
if q.Matches(tags) {
|
||||||
for _, ch := range clientToChannelMap {
|
for _, ch := range clientToChannelMap {
|
||||||
|
|
|
@ -48,14 +48,14 @@ func TestDifferentClients(t *testing.T) {
|
||||||
ch1 := make(chan interface{}, 1)
|
ch1 := make(chan interface{}, 1)
|
||||||
err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"), ch1)
|
err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"), ch1)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
err = s.PublishWithTags(ctx, "Iceman", map[string]interface{}{"tm.events.type": "NewBlock"})
|
err = s.PublishWithTags(ctx, "Iceman", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"}))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assertReceive(t, "Iceman", ch1)
|
assertReceive(t, "Iceman", ch1)
|
||||||
|
|
||||||
ch2 := make(chan interface{}, 1)
|
ch2 := make(chan interface{}, 1)
|
||||||
err = s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"), ch2)
|
err = s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"), ch2)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
err = s.PublishWithTags(ctx, "Ultimo", map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"})
|
err = s.PublishWithTags(ctx, "Ultimo", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"}))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assertReceive(t, "Ultimo", ch1)
|
assertReceive(t, "Ultimo", ch1)
|
||||||
assertReceive(t, "Ultimo", ch2)
|
assertReceive(t, "Ultimo", ch2)
|
||||||
|
@ -63,7 +63,7 @@ func TestDifferentClients(t *testing.T) {
|
||||||
ch3 := make(chan interface{}, 1)
|
ch3 := make(chan interface{}, 1)
|
||||||
err = s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"), ch3)
|
err = s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"), ch3)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
err = s.PublishWithTags(ctx, "Valeria Richards", map[string]interface{}{"tm.events.type": "NewRoundStep"})
|
err = s.PublishWithTags(ctx, "Valeria Richards", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewRoundStep"}))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Zero(t, len(ch3))
|
assert.Zero(t, len(ch3))
|
||||||
}
|
}
|
||||||
|
@ -80,7 +80,7 @@ func TestClientSubscribesTwice(t *testing.T) {
|
||||||
ch1 := make(chan interface{}, 1)
|
ch1 := make(chan interface{}, 1)
|
||||||
err := s.Subscribe(ctx, clientID, q, ch1)
|
err := s.Subscribe(ctx, clientID, q, ch1)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
err = s.PublishWithTags(ctx, "Goblin Queen", map[string]interface{}{"tm.events.type": "NewBlock"})
|
err = s.PublishWithTags(ctx, "Goblin Queen", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"}))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assertReceive(t, "Goblin Queen", ch1)
|
assertReceive(t, "Goblin Queen", ch1)
|
||||||
|
|
||||||
|
@ -88,7 +88,7 @@ func TestClientSubscribesTwice(t *testing.T) {
|
||||||
err = s.Subscribe(ctx, clientID, q, ch2)
|
err = s.Subscribe(ctx, clientID, q, ch2)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
|
|
||||||
err = s.PublishWithTags(ctx, "Spider-Man", map[string]interface{}{"tm.events.type": "NewBlock"})
|
err = s.PublishWithTags(ctx, "Spider-Man", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"}))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assertReceive(t, "Spider-Man", ch1)
|
assertReceive(t, "Spider-Man", ch1)
|
||||||
}
|
}
|
||||||
|
@ -208,7 +208,7 @@ func benchmarkNClients(n int, b *testing.B) {
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
s.PublishWithTags(ctx, "Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": i})
|
s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": i}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -231,7 +231,7 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) {
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
s.PublishWithTags(ctx, "Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": 1})
|
s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": 1}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,11 +1,13 @@
|
||||||
package query
|
package query
|
||||||
|
|
||||||
|
import "github.com/tendermint/tmlibs/pubsub"
|
||||||
|
|
||||||
// Empty query matches any set of tags.
|
// Empty query matches any set of tags.
|
||||||
type Empty struct {
|
type Empty struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Matches always returns true.
|
// Matches always returns true.
|
||||||
func (Empty) Matches(tags map[string]interface{}) bool {
|
func (Empty) Matches(tags pubsub.TagMap) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,13 +4,14 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/tendermint/tmlibs/pubsub"
|
||||||
"github.com/tendermint/tmlibs/pubsub/query"
|
"github.com/tendermint/tmlibs/pubsub/query"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestEmptyQueryMatchesAnything(t *testing.T) {
|
func TestEmptyQueryMatchesAnything(t *testing.T) {
|
||||||
q := query.Empty{}
|
q := query.Empty{}
|
||||||
assert.True(t, q.Matches(map[string]interface{}{}))
|
assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{})))
|
||||||
assert.True(t, q.Matches(map[string]interface{}{"Asher": "Roth"}))
|
assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Asher": "Roth"})))
|
||||||
assert.True(t, q.Matches(map[string]interface{}{"Route": 66}))
|
assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Route": 66})))
|
||||||
assert.True(t, q.Matches(map[string]interface{}{"Route": 66, "Billy": "Blue"}))
|
assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Route": 66, "Billy": "Blue"})))
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,8 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/tendermint/tmlibs/pubsub"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Query holds the query string and the query parser.
|
// Query holds the query string and the query parser.
|
||||||
|
@ -145,8 +147,8 @@ func (q *Query) Conditions() []Condition {
|
||||||
//
|
//
|
||||||
// For example, query "name=John" matches tags = {"name": "John"}. More
|
// For example, query "name=John" matches tags = {"name": "John"}. More
|
||||||
// examples could be found in parser_test.go and query_test.go.
|
// examples could be found in parser_test.go and query_test.go.
|
||||||
func (q *Query) Matches(tags map[string]interface{}) bool {
|
func (q *Query) Matches(tags pubsub.TagMap) bool {
|
||||||
if len(tags) == 0 {
|
if tags.Len() == 0 {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -231,9 +233,9 @@ func (q *Query) Matches(tags map[string]interface{}) bool {
|
||||||
// value from it to the operand using the operator.
|
// value from it to the operand using the operator.
|
||||||
//
|
//
|
||||||
// "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" }
|
// "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" }
|
||||||
func match(tag string, op Operator, operand reflect.Value, tags map[string]interface{}) bool {
|
func match(tag string, op Operator, operand reflect.Value, tags pubsub.TagMap) bool {
|
||||||
// look up the tag from the query in tags
|
// look up the tag from the query in tags
|
||||||
value, ok := tags[tag]
|
value, ok := tags.Get(tag)
|
||||||
if !ok {
|
if !ok {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"github.com/tendermint/tmlibs/pubsub"
|
||||||
"github.com/tendermint/tmlibs/pubsub/query"
|
"github.com/tendermint/tmlibs/pubsub/query"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -51,9 +52,9 @@ func TestMatches(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if tc.matches {
|
if tc.matches {
|
||||||
assert.True(t, q.Matches(tc.tags), "Query '%s' should match %v", tc.s, tc.tags)
|
assert.True(t, q.Matches(pubsub.NewTagMap(tc.tags)), "Query '%s' should match %v", tc.s, tc.tags)
|
||||||
} else {
|
} else {
|
||||||
assert.False(t, q.Matches(tc.tags), "Query '%s' should not match %v", tc.s, tc.tags)
|
assert.False(t, q.Matches(pubsub.NewTagMap(tc.tags)), "Query '%s' should not match %v", tc.s, tc.tags)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,3 @@
|
||||||
package version
|
package version
|
||||||
|
|
||||||
const Version = "0.8.1"
|
const Version = "0.8.3"
|
||||||
|
|
Loading…
Reference in New Issue