diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a55fb55..1e41face 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,31 @@ # Changelog +## 0.8.3 + +*May 21, 2018* + +FEATURES: + + - [common] ASCIITrim() + +## 0.8.2 (April 23rd, 2018) + +FEATURES: + + - [pubsub] TagMap, NewTagMap + - [merkle] SimpleProofsFromMap() + - [common] IsASCIIText() + - [common] PrefixEndBytes // e.g. increment or nil + - [common] BitArray.MarshalJSON/.UnmarshalJSON + - [common] BitArray uses 'x' not 'X' for String() and above. + - [db] DebugDB shows better colorized output + +BUG FIXES: + + - [common] Fix TestParallelAbort nondeterministic failure #201/#202 + - [db] PrefixDB Iterator/ReverseIterator fixes + - [db] DebugDB fixes + ## 0.8.1 (April 5th, 2018) FEATURES: diff --git a/common/async.go b/common/async.go index 49714d95..7be09a3c 100644 --- a/common/async.go +++ b/common/async.go @@ -32,7 +32,7 @@ type TaskResultSet struct { func newTaskResultSet(chz []TaskResultCh) *TaskResultSet { return &TaskResultSet{ chz: chz, - results: nil, + results: make([]taskResultOK, len(chz)), } } @@ -49,18 +49,20 @@ func (trs *TaskResultSet) LatestResult(index int) (TaskResult, bool) { } // NOTE: Not concurrency safe. +// Writes results to trs.results without waiting for all tasks to complete. func (trs *TaskResultSet) Reap() *TaskResultSet { - if trs.results == nil { - trs.results = make([]taskResultOK, len(trs.chz)) - } for i := 0; i < len(trs.results); i++ { var trch = trs.chz[i] select { - case result := <-trch: - // Overwrite result. - trs.results[i] = taskResultOK{ - TaskResult: result, - OK: true, + case result, ok := <-trch: + if ok { + // Write result. + trs.results[i] = taskResultOK{ + TaskResult: result, + OK: true, + } + } else { + // We already wrote it. } default: // Do nothing. @@ -69,6 +71,27 @@ func (trs *TaskResultSet) Reap() *TaskResultSet { return trs } +// NOTE: Not concurrency safe. +// Like Reap() but waits until all tasks have returned or panic'd. +func (trs *TaskResultSet) Wait() *TaskResultSet { + for i := 0; i < len(trs.results); i++ { + var trch = trs.chz[i] + select { + case result, ok := <-trch: + if ok { + // Write result. + trs.results[i] = taskResultOK{ + TaskResult: result, + OK: true, + } + } else { + // We already wrote it. + } + } + } + return trs +} + // Returns the firstmost (by task index) error as // discovered by all previous Reap() calls. func (trs *TaskResultSet) FirstValue() interface{} { @@ -116,7 +139,11 @@ func Parallel(tasks ...Task) (trs *TaskResultSet, ok bool) { defer func() { if pnk := recover(); pnk != nil { atomic.AddInt32(numPanics, 1) + // Send panic to taskResultCh. taskResultCh <- TaskResult{nil, ErrorWrap(pnk, "Panic in task")} + // Closing taskResultCh lets trs.Wait() work. + close(taskResultCh) + // Decrement waitgroup. taskDoneCh <- false } }() @@ -125,6 +152,8 @@ func Parallel(tasks ...Task) (trs *TaskResultSet, ok bool) { // Send val/err to taskResultCh. // NOTE: Below this line, nothing must panic/ taskResultCh <- TaskResult{val, err} + // Closing taskResultCh lets trs.Wait() work. + close(taskResultCh) // Decrement waitgroup. taskDoneCh <- abort }(i, task, taskResultCh) diff --git a/common/async_test.go b/common/async_test.go index 9f060ca2..037afcaa 100644 --- a/common/async_test.go +++ b/common/async_test.go @@ -91,10 +91,14 @@ func TestParallelAbort(t *testing.T) { // Now let the last task (#3) complete after abort. flow4 <- <-flow3 + // Wait until all tasks have returned or panic'd. + taskResultSet.Wait() + // Verify task #0, #1, #2. checkResult(t, taskResultSet, 0, 0, nil, nil) checkResult(t, taskResultSet, 1, 1, errors.New("some error"), nil) checkResult(t, taskResultSet, 2, 2, nil, nil) + checkResult(t, taskResultSet, 3, 3, nil, nil) } func TestParallelRecover(t *testing.T) { diff --git a/common/bit_array.go b/common/bit_array.go index ea6a6ee1..0290921a 100644 --- a/common/bit_array.go +++ b/common/bit_array.go @@ -3,6 +3,7 @@ package common import ( "encoding/binary" "fmt" + "regexp" "strings" "sync" ) @@ -249,13 +250,14 @@ func (bA *BitArray) PickRandom() (int, bool) { return 0, false } +// String returns a string representation of BitArray: BA{}, +// where is a sequence of 'x' (1) and '_' (0). +// The includes spaces and newlines to help people. +// For a simple sequence of 'x' and '_' characters with no spaces or newlines, +// see the MarshalJSON() method. +// Example: "BA{_x_}" or "nil-BitArray" for nil. func (bA *BitArray) String() string { - if bA == nil { - return "nil-BitArray" - } - bA.mtx.Lock() - defer bA.mtx.Unlock() - return bA.stringIndented("") + return bA.StringIndented("") } func (bA *BitArray) StringIndented(indent string) string { @@ -268,12 +270,11 @@ func (bA *BitArray) StringIndented(indent string) string { } func (bA *BitArray) stringIndented(indent string) string { - lines := []string{} bits := "" for i := 0; i < bA.Bits; i++ { if bA.getIndex(i) { - bits += "X" + bits += "x" } else { bits += "_" } @@ -282,10 +283,10 @@ func (bA *BitArray) stringIndented(indent string) string { bits = "" } if i%10 == 9 { - bits += " " + bits += indent } if i%50 == 49 { - bits += " " + bits += indent } } if len(bits) > 0 { @@ -320,3 +321,58 @@ func (bA *BitArray) Update(o *BitArray) { copy(bA.Elems, o.Elems) } + +// MarshalJSON implements json.Marshaler interface by marshaling bit array +// using a custom format: a string of '-' or 'x' where 'x' denotes the 1 bit. +func (bA *BitArray) MarshalJSON() ([]byte, error) { + if bA == nil { + return []byte("null"), nil + } + + bA.mtx.Lock() + defer bA.mtx.Unlock() + + bits := `"` + for i := 0; i < bA.Bits; i++ { + if bA.getIndex(i) { + bits += `x` + } else { + bits += `_` + } + } + bits += `"` + return []byte(bits), nil +} + +var bitArrayJSONRegexp = regexp.MustCompile(`\A"([_x]*)"\z`) + +// UnmarshalJSON implements json.Unmarshaler interface by unmarshaling a custom +// JSON description. +func (bA *BitArray) UnmarshalJSON(bz []byte) error { + b := string(bz) + if b == "null" { + // This is required e.g. for encoding/json when decoding + // into a pointer with pre-allocated BitArray. + bA.Bits = 0 + bA.Elems = nil + return nil + } + + // Validate 'b'. + match := bitArrayJSONRegexp.FindStringSubmatch(b) + if match == nil { + return fmt.Errorf("BitArray in JSON should be a string of format %q but got %s", bitArrayJSONRegexp.String(), b) + } + bits := match[1] + + // Construct new BitArray and copy over. + numBits := len(bits) + bA2 := NewBitArray(numBits) + for i := 0; i < numBits; i++ { + if bits[i] == 'x' { + bA2.SetIndex(i, true) + } + } + *bA = *bA2 + return nil +} diff --git a/common/bit_array_test.go b/common/bit_array_test.go index fbc438cd..c697ba5d 100644 --- a/common/bit_array_test.go +++ b/common/bit_array_test.go @@ -2,8 +2,10 @@ package common import ( "bytes" + "encoding/json" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -210,8 +212,56 @@ func TestUpdateNeverPanics(t *testing.T) { } func TestNewBitArrayNeverCrashesOnNegatives(t *testing.T) { - bitList := []int{-127, -128, -1<<31} + bitList := []int{-127, -128, -1 << 31} for _, bits := range bitList { _ = NewBitArray(bits) } } + +func TestJSONMarshalUnmarshal(t *testing.T) { + + bA1 := NewBitArray(0) + + bA2 := NewBitArray(1) + + bA3 := NewBitArray(1) + bA3.SetIndex(0, true) + + bA4 := NewBitArray(5) + bA4.SetIndex(0, true) + bA4.SetIndex(1, true) + + testCases := []struct { + bA *BitArray + marshalledBA string + }{ + {nil, `null`}, + {bA1, `null`}, + {bA2, `"_"`}, + {bA3, `"x"`}, + {bA4, `"xx___"`}, + } + + for _, tc := range testCases { + t.Run(tc.bA.String(), func(t *testing.T) { + bz, err := json.Marshal(tc.bA) + require.NoError(t, err) + + assert.Equal(t, tc.marshalledBA, string(bz)) + + var unmarshalledBA *BitArray + err = json.Unmarshal(bz, &unmarshalledBA) + require.NoError(t, err) + + if tc.bA == nil { + require.Nil(t, unmarshalledBA) + } else { + require.NotNil(t, unmarshalledBA) + assert.EqualValues(t, tc.bA.Bits, unmarshalledBA.Bits) + if assert.EqualValues(t, tc.bA.String(), unmarshalledBA.String()) { + assert.EqualValues(t, tc.bA.Elems, unmarshalledBA.Elems) + } + } + }) + } +} diff --git a/common/byteslice.go b/common/byteslice.go index ceaf06bd..57b3a8a2 100644 --- a/common/byteslice.go +++ b/common/byteslice.go @@ -45,3 +45,29 @@ func TrimmedString(b []byte) string { return string(bytes.TrimLeft(b, trimSet)) } + +// PrefixEndBytes returns the end byteslice for a noninclusive range +// that would include all byte slices for which the input is the prefix +func PrefixEndBytes(prefix []byte) []byte { + if prefix == nil { + return nil + } + + end := make([]byte, len(prefix)) + copy(end, prefix) + finished := false + + for !finished { + if end[len(end)-1] != byte(255) { + end[len(end)-1]++ + finished = true + } else { + end = end[:len(end)-1] + if len(end) == 0 { + end = nil + finished = true + } + } + } + return end +} diff --git a/common/byteslice_test.go b/common/byteslice_test.go new file mode 100644 index 00000000..98085d12 --- /dev/null +++ b/common/byteslice_test.go @@ -0,0 +1,28 @@ +package common + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestPrefixEndBytes(t *testing.T) { + assert := assert.New(t) + + var testCases = []struct { + prefix []byte + expected []byte + }{ + {[]byte{byte(55), byte(255), byte(255), byte(0)}, []byte{byte(55), byte(255), byte(255), byte(1)}}, + {[]byte{byte(55), byte(255), byte(255), byte(15)}, []byte{byte(55), byte(255), byte(255), byte(16)}}, + {[]byte{byte(55), byte(200), byte(255)}, []byte{byte(55), byte(201)}}, + {[]byte{byte(55), byte(255), byte(255)}, []byte{byte(56)}}, + {[]byte{byte(255), byte(255), byte(255)}, nil}, + {nil, nil}, + } + + for _, test := range testCases { + end := PrefixEndBytes(test.prefix) + assert.Equal(test.expected, end) + } +} diff --git a/common/errors.go b/common/errors.go index 1ee1fb34..5992b234 100644 --- a/common/errors.go +++ b/common/errors.go @@ -178,7 +178,7 @@ func (err *cmnError) Format(s fmt.State, verb rune) { if s.Flag('#') { s.Write([]byte("--= Error =--\n")) // Write msg. - s.Write([]byte(fmt.Sprintf("Message: %#s\n", err.msg))) + s.Write([]byte(fmt.Sprintf("Message: %s\n", err.msg))) // Write cause. s.Write([]byte(fmt.Sprintf("Cause: %#v\n", err.cause))) // Write type. diff --git a/common/os_test.go b/common/os_test.go index 97ad672b..973d6890 100644 --- a/common/os_test.go +++ b/common/os_test.go @@ -23,11 +23,11 @@ func TestWriteFileAtomic(t *testing.T) { } defer os.Remove(f.Name()) - if err := ioutil.WriteFile(f.Name(), old, 0664); err != nil { + if err = ioutil.WriteFile(f.Name(), old, 0664); err != nil { t.Fatal(err) } - if err := WriteFileAtomic(f.Name(), data, perm); err != nil { + if err = WriteFileAtomic(f.Name(), data, perm); err != nil { t.Fatal(err) } diff --git a/common/string.go b/common/string.go index 0e2231e9..fac1be6c 100644 --- a/common/string.go +++ b/common/string.go @@ -57,3 +57,33 @@ func SplitAndTrim(s, sep, cutset string) []string { } return spl } + +// Returns true if s is a non-empty printable non-tab ascii character. +func IsASCIIText(s string) bool { + if len(s) == 0 { + return false + } + for _, b := range []byte(s) { + if 32 <= b && b <= 126 { + // good + } else { + return false + } + } + return true +} + +// NOTE: Assumes that s is ASCII as per IsASCIIText(), otherwise panics. +func ASCIITrim(s string) string { + r := make([]byte, 0, len(s)) + for _, b := range []byte(s) { + if b == 32 { + continue // skip space + } else if 32 < b && b <= 126 { + r = append(r, b) + } else { + panic(fmt.Sprintf("non-ASCII (non-tab) char 0x%X", b)) + } + } + return string(r) +} diff --git a/common/string_test.go b/common/string_test.go index 82ba6784..5d1b68fe 100644 --- a/common/string_test.go +++ b/common/string_test.go @@ -49,3 +49,26 @@ func TestSplitAndTrim(t *testing.T) { assert.Equal(t, tc.expected, SplitAndTrim(tc.s, tc.sep, tc.cutset), "%s", tc.s) } } + +func TestIsASCIIText(t *testing.T) { + notASCIIText := []string{ + "", "\xC2", "\xC2\xA2", "\xFF", "\x80", "\xF0", "\n", "\t", + } + for _, v := range notASCIIText { + assert.False(t, IsHex(v), "%q is not ascii-text", v) + } + asciiText := []string{ + " ", ".", "x", "$", "_", "abcdefg;", "-", "0x00", "0", "123", + } + for _, v := range asciiText { + assert.True(t, IsASCIIText(v), "%q is ascii-text", v) + } +} + +func TestASCIITrim(t *testing.T) { + assert.Equal(t, ASCIITrim(" "), "") + assert.Equal(t, ASCIITrim(" a"), "a") + assert.Equal(t, ASCIITrim("a "), "a") + assert.Equal(t, ASCIITrim(" a "), "a") + assert.Panics(t, func() { ASCIITrim("\xC2\xA2") }) +} diff --git a/db/common_test.go b/db/common_test.go index 1d8d52c5..6af6e15e 100644 --- a/db/common_test.go +++ b/db/common_test.go @@ -33,6 +33,12 @@ func checkNextPanics(t *testing.T, itr Iterator) { assert.Panics(t, func() { itr.Next() }, "checkNextPanics expected panic but didn't") } +func checkDomain(t *testing.T, itr Iterator, start, end []byte) { + ds, de := itr.Domain() + assert.Equal(t, start, ds, "checkDomain domain start incorrect") + assert.Equal(t, end, de, "checkDomain domain end incorrect") +} + func checkItem(t *testing.T, itr Iterator, key []byte, value []byte) { k, v := itr.Key(), itr.Value() assert.Exactly(t, key, k) diff --git a/db/debug_db.go b/db/debug_db.go index 7a15bc29..7666ed9f 100644 --- a/db/debug_db.go +++ b/db/debug_db.go @@ -3,8 +3,14 @@ package db import ( "fmt" "sync" + + cmn "github.com/tendermint/tmlibs/common" ) +func _fmt(f string, az ...interface{}) string { + return fmt.Sprintf(f, az...) +} + //---------------------------------------- // debugDB @@ -26,78 +32,84 @@ func (ddb debugDB) Mutex() *sync.Mutex { return nil } // Implements DB. func (ddb debugDB) Get(key []byte) (value []byte) { - defer fmt.Printf("%v.Get(%X) %X\n", ddb.label, key, value) + defer func() { + fmt.Printf("%v.Get(%v) %v\n", ddb.label, cmn.Cyan(_fmt("%X", key)), cmn.Blue(_fmt("%X", value))) + }() value = ddb.db.Get(key) return } // Implements DB. func (ddb debugDB) Has(key []byte) (has bool) { - defer fmt.Printf("%v.Has(%X) %v\n", ddb.label, key, has) + defer func() { + fmt.Printf("%v.Has(%v) %v\n", ddb.label, cmn.Cyan(_fmt("%X", key)), has) + }() return ddb.db.Has(key) } // Implements DB. func (ddb debugDB) Set(key []byte, value []byte) { - fmt.Printf("%v.Set(%X, %X)\n", ddb.label, key, value) + fmt.Printf("%v.Set(%v, %v)\n", ddb.label, cmn.Cyan(_fmt("%X", key)), cmn.Yellow(_fmt("%X", value))) ddb.db.Set(key, value) } // Implements DB. func (ddb debugDB) SetSync(key []byte, value []byte) { - fmt.Printf("%v.SetSync(%X, %X)\n", ddb.label, key, value) + fmt.Printf("%v.SetSync(%v, %v)\n", ddb.label, cmn.Cyan(_fmt("%X", key)), cmn.Yellow(_fmt("%X", value))) ddb.db.SetSync(key, value) } // Implements atomicSetDeleter. func (ddb debugDB) SetNoLock(key []byte, value []byte) { - fmt.Printf("%v.SetNoLock(%X, %X)\n", ddb.label, key, value) - ddb.db.Set(key, value) + fmt.Printf("%v.SetNoLock(%v, %v)\n", ddb.label, cmn.Cyan(_fmt("%X", key)), cmn.Yellow(_fmt("%X", value))) + ddb.db.(atomicSetDeleter).SetNoLock(key, value) } // Implements atomicSetDeleter. func (ddb debugDB) SetNoLockSync(key []byte, value []byte) { - fmt.Printf("%v.SetNoLockSync(%X, %X)\n", ddb.label, key, value) - ddb.db.SetSync(key, value) + fmt.Printf("%v.SetNoLockSync(%v, %v)\n", ddb.label, cmn.Cyan(_fmt("%X", key)), cmn.Yellow(_fmt("%X", value))) + ddb.db.(atomicSetDeleter).SetNoLockSync(key, value) } // Implements DB. func (ddb debugDB) Delete(key []byte) { - fmt.Printf("%v.Delete(%X)\n", ddb.label, key) + fmt.Printf("%v.Delete(%v)\n", ddb.label, cmn.Red(_fmt("%X", key))) ddb.db.Delete(key) } // Implements DB. func (ddb debugDB) DeleteSync(key []byte) { - fmt.Printf("%v.DeleteSync(%X)\n", ddb.label, key) + fmt.Printf("%v.DeleteSync(%v)\n", ddb.label, cmn.Red(_fmt("%X", key))) ddb.db.DeleteSync(key) } // Implements atomicSetDeleter. func (ddb debugDB) DeleteNoLock(key []byte) { - fmt.Printf("%v.DeleteNoLock(%X)\n", ddb.label, key) - ddb.db.Delete(key) + fmt.Printf("%v.DeleteNoLock(%v)\n", ddb.label, cmn.Red(_fmt("%X", key))) + ddb.db.(atomicSetDeleter).DeleteNoLock(key) } // Implements atomicSetDeleter. func (ddb debugDB) DeleteNoLockSync(key []byte) { - fmt.Printf("%v.DeleteNoLockSync(%X)\n", ddb.label, key) - ddb.db.DeleteSync(key) + fmt.Printf("%v.DeleteNoLockSync(%v)\n", ddb.label, cmn.Red(_fmt("%X", key))) + ddb.db.(atomicSetDeleter).DeleteNoLockSync(key) } // Implements DB. func (ddb debugDB) Iterator(start, end []byte) Iterator { - fmt.Printf("%v.Iterator(%X, %X)\n", ddb.label, start, end) + fmt.Printf("%v.Iterator(%v, %v)\n", ddb.label, cmn.Cyan(_fmt("%X", start)), cmn.Blue(_fmt("%X", end))) return NewDebugIterator(ddb.label, ddb.db.Iterator(start, end)) } // Implements DB. func (ddb debugDB) ReverseIterator(start, end []byte) Iterator { - fmt.Printf("%v.ReverseIterator(%X, %X)\n", ddb.label, start, end) + fmt.Printf("%v.ReverseIterator(%v, %v)\n", ddb.label, cmn.Cyan(_fmt("%X", start)), cmn.Blue(_fmt("%X", end))) return NewDebugIterator(ddb.label, ddb.db.ReverseIterator(start, end)) } // Implements DB. +// Panics if the underlying db is not an +// atomicSetDeleter. func (ddb debugDB) NewBatch() Batch { fmt.Printf("%v.NewBatch()\n", ddb.label) return NewDebugBatch(ddb.label, ddb.db.NewBatch()) @@ -137,14 +149,18 @@ func NewDebugIterator(label string, itr Iterator) debugIterator { // Implements Iterator. func (ditr debugIterator) Domain() (start []byte, end []byte) { - defer fmt.Printf("%v.itr.Domain() (%X,%X)\n", ditr.label, start, end) + defer func() { + fmt.Printf("%v.itr.Domain() (%X,%X)\n", ditr.label, start, end) + }() start, end = ditr.itr.Domain() return } // Implements Iterator. func (ditr debugIterator) Valid() (ok bool) { - defer fmt.Printf("%v.itr.Valid() %v\n", ditr.label, ok) + defer func() { + fmt.Printf("%v.itr.Valid() %v\n", ditr.label, ok) + }() ok = ditr.itr.Valid() return } @@ -157,14 +173,14 @@ func (ditr debugIterator) Next() { // Implements Iterator. func (ditr debugIterator) Key() (key []byte) { - fmt.Printf("%v.itr.Key() %X\n", ditr.label, key) + fmt.Printf("%v.itr.Key() %v\n", ditr.label, cmn.Cyan(_fmt("%X", key))) key = ditr.itr.Key() return } // Implements Iterator. func (ditr debugIterator) Value() (value []byte) { - fmt.Printf("%v.itr.Value() %X\n", ditr.label, value) + fmt.Printf("%v.itr.Value() %v\n", ditr.label, cmn.Blue(_fmt("%X", value))) value = ditr.itr.Value() return } @@ -193,13 +209,13 @@ func NewDebugBatch(label string, bch Batch) debugBatch { // Implements Batch. func (dbch debugBatch) Set(key, value []byte) { - fmt.Printf("%v.batch.Set(%X, %X)\n", dbch.label, key, value) + fmt.Printf("%v.batch.Set(%v, %v)\n", dbch.label, cmn.Cyan(_fmt("%X", key)), cmn.Yellow(_fmt("%X", value))) dbch.bch.Set(key, value) } // Implements Batch. func (dbch debugBatch) Delete(key []byte) { - fmt.Printf("%v.batch.Delete(%X)\n", dbch.label, key) + fmt.Printf("%v.batch.Delete(%v)\n", dbch.label, cmn.Red(_fmt("%X", key))) dbch.bch.Delete(key) } diff --git a/db/mem_batch.go b/db/mem_batch.go index 81a63d62..5c5d0c13 100644 --- a/db/mem_batch.go +++ b/db/mem_batch.go @@ -1,6 +1,8 @@ package db -import "sync" +import ( + "sync" +) type atomicSetDeleter interface { Mutex() *sync.Mutex @@ -66,6 +68,5 @@ func (mBatch *memBatch) write(doSync bool) { case opTypeDelete: mBatch.db.DeleteNoLock(op.key) } - } } diff --git a/db/mem_db.go b/db/mem_db.go index 2d802947..1521f87a 100644 --- a/db/mem_db.go +++ b/db/mem_db.go @@ -37,7 +37,8 @@ func (db *MemDB) Get(key []byte) []byte { defer db.mtx.Unlock() key = nonNilBytes(key) - return db.db[string(key)] + value := db.db[string(key)] + return value } // Implements DB. @@ -162,7 +163,7 @@ func (db *MemDB) ReverseIterator(start, end []byte) Iterator { db.mtx.Lock() defer db.mtx.Unlock() - keys := db.getSortedKeys(end, start, true) + keys := db.getSortedKeys(start, end, true) return newMemDBIterator(db, keys, start, end) } @@ -236,7 +237,8 @@ func (itr *memDBIterator) assertIsValid() { func (db *MemDB) getSortedKeys(start, end []byte, reverse bool) []string { keys := []string{} for key := range db.db { - if IsKeyInDomain([]byte(key), start, end, false) { + inDomain := IsKeyInDomain([]byte(key), start, end, reverse) + if inDomain { keys = append(keys, key) } } @@ -244,7 +246,9 @@ func (db *MemDB) getSortedKeys(start, end []byte, reverse bool) []string { if reverse { nkeys := len(keys) for i := 0; i < nkeys/2; i++ { + temp := keys[i] keys[i] = keys[nkeys-i-1] + keys[nkeys-i-1] = temp } } return keys diff --git a/db/prefix_db.go b/db/prefix_db.go index 4381ce07..5bb53ebd 100644 --- a/db/prefix_db.go +++ b/db/prefix_db.go @@ -24,7 +24,8 @@ func IteratePrefix(db DB, prefix []byte) Iterator { TODO: Make test, maybe rename. // Like IteratePrefix but the iterator strips the prefix from the keys. func IteratePrefixStripped(db DB, prefix []byte) Iterator { - return newUnprefixIterator(prefix, IteratePrefix(db, prefix)) + start, end := ... + return newPrefixIterator(prefix, start, end, IteratePrefix(db, prefix)) } */ @@ -55,7 +56,9 @@ func (pdb *prefixDB) Get(key []byte) []byte { pdb.mtx.Lock() defer pdb.mtx.Unlock() - return pdb.db.Get(pdb.prefixed(key)) + pkey := pdb.prefixed(key) + value := pdb.db.Get(pkey) + return value } // Implements DB. @@ -71,7 +74,8 @@ func (pdb *prefixDB) Set(key []byte, value []byte) { pdb.mtx.Lock() defer pdb.mtx.Unlock() - pdb.db.Set(pdb.prefixed(key), value) + pkey := pdb.prefixed(key) + pdb.db.Set(pkey, value) } // Implements DB. @@ -82,16 +86,6 @@ func (pdb *prefixDB) SetSync(key []byte, value []byte) { pdb.db.SetSync(pdb.prefixed(key), value) } -// Implements atomicSetDeleter. -func (pdb *prefixDB) SetNoLock(key []byte, value []byte) { - pdb.db.Set(pdb.prefixed(key), value) -} - -// Implements atomicSetDeleter. -func (pdb *prefixDB) SetNoLockSync(key []byte, value []byte) { - pdb.db.SetSync(pdb.prefixed(key), value) -} - // Implements DB. func (pdb *prefixDB) Delete(key []byte) { pdb.mtx.Lock() @@ -108,28 +102,22 @@ func (pdb *prefixDB) DeleteSync(key []byte) { pdb.db.DeleteSync(pdb.prefixed(key)) } -// Implements atomicSetDeleter. -func (pdb *prefixDB) DeleteNoLock(key []byte) { - pdb.db.Delete(pdb.prefixed(key)) -} - -// Implements atomicSetDeleter. -func (pdb *prefixDB) DeleteNoLockSync(key []byte) { - pdb.db.DeleteSync(pdb.prefixed(key)) -} - // Implements DB. func (pdb *prefixDB) Iterator(start, end []byte) Iterator { pdb.mtx.Lock() defer pdb.mtx.Unlock() - pstart := append(pdb.prefix, start...) - pend := []byte(nil) - if end != nil { - pend = append(pdb.prefix, end...) + var pstart, pend []byte + pstart = append(cp(pdb.prefix), start...) + if end == nil { + pend = cpIncr(pdb.prefix) + } else { + pend = append(cp(pdb.prefix), end...) } - return newUnprefixIterator( + return newPrefixIterator( pdb.prefix, + start, + end, pdb.db.Iterator( pstart, pend, @@ -142,31 +130,68 @@ func (pdb *prefixDB) ReverseIterator(start, end []byte) Iterator { pdb.mtx.Lock() defer pdb.mtx.Unlock() - pstart := []byte(nil) - if start != nil { - pstart = append(pdb.prefix, start...) + var pstart, pend []byte + if start == nil { + // This may cause the underlying iterator to start with + // an item which doesn't start with prefix. We will skip + // that item later in this function. See 'skipOne'. + pstart = cpIncr(pdb.prefix) + } else { + pstart = append(cp(pdb.prefix), start...) } - pend := []byte(nil) - if end != nil { - pend = append(pdb.prefix, end...) + if end == nil { + // This may cause the underlying iterator to end with an + // item which doesn't start with prefix. The + // prefixIterator will terminate iteration + // automatically upon detecting this. + pend = cpDecr(pdb.prefix) + } else { + pend = append(cp(pdb.prefix), end...) } - return newUnprefixIterator( + ritr := pdb.db.ReverseIterator(pstart, pend) + if start == nil { + skipOne(ritr, cpIncr(pdb.prefix)) + } + return newPrefixIterator( pdb.prefix, - pdb.db.ReverseIterator( - pstart, - pend, - ), + start, + end, + ritr, ) } // Implements DB. +// Panics if the underlying DB is not an +// atomicSetDeleter. func (pdb *prefixDB) NewBatch() Batch { pdb.mtx.Lock() defer pdb.mtx.Unlock() - return &memBatch{pdb, nil} + return newPrefixBatch(pdb.prefix, pdb.db.NewBatch()) } +/* NOTE: Uncomment to use memBatch instead of prefixBatch +// Implements atomicSetDeleter. +func (pdb *prefixDB) SetNoLock(key []byte, value []byte) { + pdb.db.(atomicSetDeleter).SetNoLock(pdb.prefixed(key), value) +} + +// Implements atomicSetDeleter. +func (pdb *prefixDB) SetNoLockSync(key []byte, value []byte) { + pdb.db.(atomicSetDeleter).SetNoLockSync(pdb.prefixed(key), value) +} + +// Implements atomicSetDeleter. +func (pdb *prefixDB) DeleteNoLock(key []byte) { + pdb.db.(atomicSetDeleter).DeleteNoLock(pdb.prefixed(key)) +} + +// Implements atomicSetDeleter. +func (pdb *prefixDB) DeleteNoLockSync(key []byte) { + pdb.db.(atomicSetDeleter).DeleteNoLockSync(pdb.prefixed(key)) +} +*/ + // Implements DB. func (pdb *prefixDB) Close() { pdb.mtx.Lock() @@ -201,52 +226,109 @@ func (pdb *prefixDB) Stats() map[string]string { } func (pdb *prefixDB) prefixed(key []byte) []byte { - return append(pdb.prefix, key...) + return append(cp(pdb.prefix), key...) } //---------------------------------------- +// prefixBatch -// Strips prefix while iterating from Iterator. -type unprefixIterator struct { +type prefixBatch struct { prefix []byte - source Iterator + source Batch } -func newUnprefixIterator(prefix []byte, source Iterator) unprefixIterator { - return unprefixIterator{ +func newPrefixBatch(prefix []byte, source Batch) prefixBatch { + return prefixBatch{ prefix: prefix, source: source, } } -func (itr unprefixIterator) Domain() (start []byte, end []byte) { - start, end = itr.source.Domain() - if len(start) > 0 { - start = stripPrefix(start, itr.prefix) - } - if len(end) > 0 { - end = stripPrefix(end, itr.prefix) - } - return +func (pb prefixBatch) Set(key, value []byte) { + pkey := append(cp(pb.prefix), key...) + pb.source.Set(pkey, value) } -func (itr unprefixIterator) Valid() bool { - return itr.source.Valid() +func (pb prefixBatch) Delete(key []byte) { + pkey := append(cp(pb.prefix), key...) + pb.source.Delete(pkey) } -func (itr unprefixIterator) Next() { +func (pb prefixBatch) Write() { + pb.source.Write() +} + +func (pb prefixBatch) WriteSync() { + pb.source.WriteSync() +} + +//---------------------------------------- +// prefixIterator + +// Strips prefix while iterating from Iterator. +type prefixIterator struct { + prefix []byte + start []byte + end []byte + source Iterator + valid bool +} + +func newPrefixIterator(prefix, start, end []byte, source Iterator) prefixIterator { + if !source.Valid() || !bytes.HasPrefix(source.Key(), prefix) { + return prefixIterator{ + prefix: prefix, + start: start, + end: end, + source: source, + valid: false, + } + } else { + return prefixIterator{ + prefix: prefix, + start: start, + end: end, + source: source, + valid: true, + } + } +} + +func (itr prefixIterator) Domain() (start []byte, end []byte) { + return itr.start, itr.end +} + +func (itr prefixIterator) Valid() bool { + return itr.valid && itr.source.Valid() +} + +func (itr prefixIterator) Next() { + if !itr.valid { + panic("prefixIterator invalid, cannot call Next()") + } itr.source.Next() + if !itr.source.Valid() || !bytes.HasPrefix(itr.source.Key(), itr.prefix) { + itr.source.Close() + itr.valid = false + return + } } -func (itr unprefixIterator) Key() (key []byte) { +func (itr prefixIterator) Key() (key []byte) { + if !itr.valid { + panic("prefixIterator invalid, cannot call Key()") + } return stripPrefix(itr.source.Key(), itr.prefix) } -func (itr unprefixIterator) Value() (value []byte) { +func (itr prefixIterator) Value() (value []byte) { + if !itr.valid { + panic("prefixIterator invalid, cannot call Value()") + } return itr.source.Value() } -func (itr unprefixIterator) Close() { +func (itr prefixIterator) Close() { itr.source.Close() } @@ -261,3 +343,13 @@ func stripPrefix(key []byte, prefix []byte) (stripped []byte) { } return key[len(prefix):] } + +// If the first iterator item is skipKey, then +// skip it. +func skipOne(itr Iterator, skipKey []byte) { + if itr.Valid() { + if bytes.Equal(itr.Key(), skipKey) { + itr.Next() + } + } +} diff --git a/db/prefix_db_test.go b/db/prefix_db_test.go index fd44a7ec..60809f15 100644 --- a/db/prefix_db_test.go +++ b/db/prefix_db_test.go @@ -2,7 +2,7 @@ package db import "testing" -func TestIteratePrefix(t *testing.T) { +func mockDBWithStuff() DB { db := NewMemDB() // Under "key" prefix db.Set(bz("key"), bz("value")) @@ -14,10 +14,13 @@ func TestIteratePrefix(t *testing.T) { db.Set(bz("k"), bz("val")) db.Set(bz("ke"), bz("valu")) db.Set(bz("kee"), bz("valuu")) - xitr := db.Iterator(nil, nil) - xitr.Key() + return db +} +func TestPrefixDBSimple(t *testing.T) { + db := mockDBWithStuff() pdb := NewPrefixDB(db, bz("key")) + checkValue(t, pdb, bz("key"), nil) checkValue(t, pdb, bz(""), bz("value")) checkValue(t, pdb, bz("key1"), nil) @@ -30,9 +33,14 @@ func TestIteratePrefix(t *testing.T) { checkValue(t, pdb, bz("k"), nil) checkValue(t, pdb, bz("ke"), nil) checkValue(t, pdb, bz("kee"), nil) +} + +func TestPrefixDBIterator1(t *testing.T) { + db := mockDBWithStuff() + pdb := NewPrefixDB(db, bz("key")) itr := pdb.Iterator(nil, nil) - itr.Key() + checkDomain(t, itr, nil, nil) checkItem(t, itr, bz(""), bz("value")) checkNext(t, itr, true) checkItem(t, itr, bz("1"), bz("value1")) @@ -40,5 +48,100 @@ func TestIteratePrefix(t *testing.T) { checkItem(t, itr, bz("2"), bz("value2")) checkNext(t, itr, true) checkItem(t, itr, bz("3"), bz("value3")) + checkNext(t, itr, false) + checkInvalid(t, itr) + itr.Close() +} + +func TestPrefixDBIterator2(t *testing.T) { + db := mockDBWithStuff() + pdb := NewPrefixDB(db, bz("key")) + + itr := pdb.Iterator(nil, bz("")) + checkDomain(t, itr, nil, bz("")) + checkInvalid(t, itr) + itr.Close() +} + +func TestPrefixDBIterator3(t *testing.T) { + db := mockDBWithStuff() + pdb := NewPrefixDB(db, bz("key")) + + itr := pdb.Iterator(bz(""), nil) + checkDomain(t, itr, bz(""), nil) + checkItem(t, itr, bz(""), bz("value")) + checkNext(t, itr, true) + checkItem(t, itr, bz("1"), bz("value1")) + checkNext(t, itr, true) + checkItem(t, itr, bz("2"), bz("value2")) + checkNext(t, itr, true) + checkItem(t, itr, bz("3"), bz("value3")) + checkNext(t, itr, false) + checkInvalid(t, itr) + itr.Close() +} + +func TestPrefixDBIterator4(t *testing.T) { + db := mockDBWithStuff() + pdb := NewPrefixDB(db, bz("key")) + + itr := pdb.Iterator(bz(""), bz("")) + checkDomain(t, itr, bz(""), bz("")) + checkInvalid(t, itr) + itr.Close() +} + +func TestPrefixDBReverseIterator1(t *testing.T) { + db := mockDBWithStuff() + pdb := NewPrefixDB(db, bz("key")) + + itr := pdb.ReverseIterator(nil, nil) + checkDomain(t, itr, nil, nil) + checkItem(t, itr, bz("3"), bz("value3")) + checkNext(t, itr, true) + checkItem(t, itr, bz("2"), bz("value2")) + checkNext(t, itr, true) + checkItem(t, itr, bz("1"), bz("value1")) + checkNext(t, itr, true) + checkItem(t, itr, bz(""), bz("value")) + checkNext(t, itr, false) + checkInvalid(t, itr) + itr.Close() +} + +func TestPrefixDBReverseIterator2(t *testing.T) { + db := mockDBWithStuff() + pdb := NewPrefixDB(db, bz("key")) + + itr := pdb.ReverseIterator(nil, bz("")) + checkDomain(t, itr, nil, bz("")) + checkItem(t, itr, bz("3"), bz("value3")) + checkNext(t, itr, true) + checkItem(t, itr, bz("2"), bz("value2")) + checkNext(t, itr, true) + checkItem(t, itr, bz("1"), bz("value1")) + checkNext(t, itr, false) + checkInvalid(t, itr) + itr.Close() +} + +func TestPrefixDBReverseIterator3(t *testing.T) { + db := mockDBWithStuff() + pdb := NewPrefixDB(db, bz("key")) + + itr := pdb.ReverseIterator(bz(""), nil) + checkDomain(t, itr, bz(""), nil) + checkItem(t, itr, bz(""), bz("value")) + checkNext(t, itr, false) + checkInvalid(t, itr) + itr.Close() +} + +func TestPrefixDBReverseIterator4(t *testing.T) { + db := mockDBWithStuff() + pdb := NewPrefixDB(db, bz("key")) + + itr := pdb.ReverseIterator(bz(""), bz("")) + checkInvalid(t, itr) itr.Close() } diff --git a/db/util.go b/db/util.go index 1ad5002d..51277ac4 100644 --- a/db/util.go +++ b/db/util.go @@ -33,6 +33,29 @@ func cpIncr(bz []byte) (ret []byte) { return nil } +// Returns a slice of the same length (big endian) +// except decremented by one. +// Returns nil on underflow (e.g. if bz bytes are all 0x00) +// CONTRACT: len(bz) > 0 +func cpDecr(bz []byte) (ret []byte) { + if len(bz) == 0 { + panic("cpDecr expects non-zero bz length") + } + ret = cp(bz) + for i := len(bz) - 1; i >= 0; i-- { + if ret[i] > byte(0x00) { + ret[i]-- + return + } + ret[i] = byte(0xFF) + if i == 0 { + // Underflow + return nil + } + } + return nil +} + // See DB interface documentation for more information. func IsKeyInDomain(key, start, end []byte, isReverse bool) bool { if !isReverse { @@ -43,12 +66,13 @@ func IsKeyInDomain(key, start, end []byte, isReverse bool) bool { return false } return true + } else { + if start != nil && bytes.Compare(start, key) < 0 { + return false + } + if end != nil && bytes.Compare(key, end) <= 0 { + return false + } + return true } - if start != nil && bytes.Compare(start, key) < 0 { - return false - } - if end != nil && bytes.Compare(key, end) <= 0 { - return false - } - return true } diff --git a/merkle/simple_map.go b/merkle/simple_map.go index b59e3b4b..cd38de76 100644 --- a/merkle/simple_map.go +++ b/merkle/simple_map.go @@ -60,9 +60,9 @@ func (sm *SimpleMap) KVPairs() cmn.KVPairs { //---------------------------------------- // A local extension to KVPair that can be hashed. -type kvPair cmn.KVPair +type KVPair cmn.KVPair -func (kv kvPair) Hash() []byte { +func (kv KVPair) Hash() []byte { hasher := ripemd160.New() err := encodeByteSlice(hasher, kv.Key) if err != nil { @@ -78,7 +78,7 @@ func (kv kvPair) Hash() []byte { func hashKVPairs(kvs cmn.KVPairs) []byte { kvsH := make([]Hasher, 0, len(kvs)) for _, kvp := range kvs { - kvsH = append(kvsH, kvPair(kvp)) + kvsH = append(kvsH, KVPair(kvp)) } return SimpleHashFromHashers(kvsH) } diff --git a/merkle/simple_proof.go b/merkle/simple_proof.go index c81ed674..ca6ccf37 100644 --- a/merkle/simple_proof.go +++ b/merkle/simple_proof.go @@ -22,6 +22,20 @@ func SimpleProofsFromHashers(items []Hasher) (rootHash []byte, proofs []*SimpleP return } +func SimpleProofsFromMap(m map[string]Hasher) (rootHash []byte, proofs []*SimpleProof) { + sm := NewSimpleMap() + for k, v := range m { + sm.Set(k, v) + } + sm.Sort() + kvs := sm.kvs + kvsH := make([]Hasher, 0, len(kvs)) + for _, kvp := range kvs { + kvsH = append(kvsH, KVPair(kvp)) + } + return SimpleProofsFromHashers(kvsH) +} + // Verify that leafHash is a leaf hash of the simple-merkle-tree // which hashes to rootHash. func (sp *SimpleProof) Verify(index int, total int, leafHash []byte, rootHash []byte) bool { diff --git a/merkle/simple_tree_test.go b/merkle/simple_tree_test.go index 26f35c80..8c4ed01f 100644 --- a/merkle/simple_tree_test.go +++ b/merkle/simple_tree_test.go @@ -3,7 +3,7 @@ package merkle import ( "bytes" - . "github.com/tendermint/tmlibs/common" + cmn "github.com/tendermint/tmlibs/common" . "github.com/tendermint/tmlibs/test" "testing" @@ -21,7 +21,7 @@ func TestSimpleProof(t *testing.T) { items := make([]Hasher, total) for i := 0; i < total; i++ { - items[i] = testItem(RandBytes(32)) + items[i] = testItem(cmn.RandBytes(32)) } rootHash := SimpleHashFromHashers(items) @@ -53,7 +53,7 @@ func TestSimpleProof(t *testing.T) { // Trail too long should make it fail origAunts := proof.Aunts - proof.Aunts = append(proof.Aunts, RandBytes(32)) + proof.Aunts = append(proof.Aunts, cmn.RandBytes(32)) { ok = proof.Verify(i, total, itemHash, rootHash) if ok { diff --git a/pubsub/example_test.go b/pubsub/example_test.go index 3eda7d32..71f1b9cd 100644 --- a/pubsub/example_test.go +++ b/pubsub/example_test.go @@ -21,7 +21,7 @@ func TestExample(t *testing.T) { ch := make(chan interface{}, 1) err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"), ch) require.NoError(t, err) - err = s.PublishWithTags(ctx, "Tombstone", map[string]interface{}{"abci.account.name": "John"}) + err = s.PublishWithTags(ctx, "Tombstone", pubsub.NewTagMap(map[string]interface{}{"abci.account.name": "John"})) require.NoError(t, err) assertReceive(t, "Tombstone", ch) } diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 90f6e4ae..67f264ac 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -38,18 +38,30 @@ var ( ErrAlreadySubscribed = errors.New("already subscribed") ) +// TagMap is used to associate tags to a message. +// They can be queried by subscribers to choose messages they will received. +type TagMap interface { + // Get returns the value for a key, or nil if no value is present. + // The ok result indicates whether value was found in the tags. + Get(key string) (value interface{}, ok bool) + // Len returns the number of tags. + Len() int +} + +type tagMap map[string]interface{} + type cmd struct { op operation query Query ch chan<- interface{} clientID string msg interface{} - tags map[string]interface{} + tags TagMap } // Query defines an interface for a query to be used for subscribing. type Query interface { - Matches(tags map[string]interface{}) bool + Matches(tags TagMap) bool String() string } @@ -68,6 +80,23 @@ type Server struct { // Option sets a parameter for the server. type Option func(*Server) +// NewTagMap constructs a new immutable tag set from a map. +func NewTagMap(data map[string]interface{}) TagMap { + return tagMap(data) +} + +// Get returns the value for a key, or nil if no value is present. +// The ok result indicates whether value was found in the tags. +func (ts tagMap) Get(key string) (value interface{}, ok bool) { + value, ok = ts[key] + return +} + +// Len returns the number of tags. +func (ts tagMap) Len() int { + return len(ts) +} + // NewServer returns a new server. See the commentary on the Option functions // for a detailed description of how to configure buffering. If no options are // provided, the resulting server's queue is unbuffered. @@ -184,13 +213,13 @@ func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error { // Publish publishes the given message. An error will be returned to the caller // if the context is canceled. func (s *Server) Publish(ctx context.Context, msg interface{}) error { - return s.PublishWithTags(ctx, msg, make(map[string]interface{})) + return s.PublishWithTags(ctx, msg, NewTagMap(make(map[string]interface{}))) } // PublishWithTags publishes the given message with the set of tags. The set is // matched with clients queries. If there is a match, the message is sent to // the client. -func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags map[string]interface{}) error { +func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags TagMap) error { select { case s.cmds <- cmd{op: pub, msg: msg, tags: tags}: return nil @@ -302,7 +331,7 @@ func (state *state) removeAll(clientID string) { delete(state.clients, clientID) } -func (state *state) send(msg interface{}, tags map[string]interface{}) { +func (state *state) send(msg interface{}, tags TagMap) { for q, clientToChannelMap := range state.queries { if q.Matches(tags) { for _, ch := range clientToChannelMap { diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 2af7cea4..f853d163 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -48,14 +48,14 @@ func TestDifferentClients(t *testing.T) { ch1 := make(chan interface{}, 1) err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"), ch1) require.NoError(t, err) - err = s.PublishWithTags(ctx, "Iceman", map[string]interface{}{"tm.events.type": "NewBlock"}) + err = s.PublishWithTags(ctx, "Iceman", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"})) require.NoError(t, err) assertReceive(t, "Iceman", ch1) ch2 := make(chan interface{}, 1) err = s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"), ch2) require.NoError(t, err) - err = s.PublishWithTags(ctx, "Ultimo", map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"}) + err = s.PublishWithTags(ctx, "Ultimo", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"})) require.NoError(t, err) assertReceive(t, "Ultimo", ch1) assertReceive(t, "Ultimo", ch2) @@ -63,7 +63,7 @@ func TestDifferentClients(t *testing.T) { ch3 := make(chan interface{}, 1) err = s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"), ch3) require.NoError(t, err) - err = s.PublishWithTags(ctx, "Valeria Richards", map[string]interface{}{"tm.events.type": "NewRoundStep"}) + err = s.PublishWithTags(ctx, "Valeria Richards", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewRoundStep"})) require.NoError(t, err) assert.Zero(t, len(ch3)) } @@ -80,7 +80,7 @@ func TestClientSubscribesTwice(t *testing.T) { ch1 := make(chan interface{}, 1) err := s.Subscribe(ctx, clientID, q, ch1) require.NoError(t, err) - err = s.PublishWithTags(ctx, "Goblin Queen", map[string]interface{}{"tm.events.type": "NewBlock"}) + err = s.PublishWithTags(ctx, "Goblin Queen", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"})) require.NoError(t, err) assertReceive(t, "Goblin Queen", ch1) @@ -88,7 +88,7 @@ func TestClientSubscribesTwice(t *testing.T) { err = s.Subscribe(ctx, clientID, q, ch2) require.Error(t, err) - err = s.PublishWithTags(ctx, "Spider-Man", map[string]interface{}{"tm.events.type": "NewBlock"}) + err = s.PublishWithTags(ctx, "Spider-Man", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"})) require.NoError(t, err) assertReceive(t, "Spider-Man", ch1) } @@ -208,7 +208,7 @@ func benchmarkNClients(n int, b *testing.B) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - s.PublishWithTags(ctx, "Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": i}) + s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": i})) } } @@ -231,7 +231,7 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - s.PublishWithTags(ctx, "Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": 1}) + s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": 1})) } } diff --git a/pubsub/query/empty.go b/pubsub/query/empty.go index 2d60a892..cefdace4 100644 --- a/pubsub/query/empty.go +++ b/pubsub/query/empty.go @@ -1,11 +1,13 @@ package query +import "github.com/tendermint/tmlibs/pubsub" + // Empty query matches any set of tags. type Empty struct { } // Matches always returns true. -func (Empty) Matches(tags map[string]interface{}) bool { +func (Empty) Matches(tags pubsub.TagMap) bool { return true } diff --git a/pubsub/query/empty_test.go b/pubsub/query/empty_test.go index 663acb19..b5e8a300 100644 --- a/pubsub/query/empty_test.go +++ b/pubsub/query/empty_test.go @@ -4,13 +4,14 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/tendermint/tmlibs/pubsub" "github.com/tendermint/tmlibs/pubsub/query" ) func TestEmptyQueryMatchesAnything(t *testing.T) { q := query.Empty{} - assert.True(t, q.Matches(map[string]interface{}{})) - assert.True(t, q.Matches(map[string]interface{}{"Asher": "Roth"})) - assert.True(t, q.Matches(map[string]interface{}{"Route": 66})) - assert.True(t, q.Matches(map[string]interface{}{"Route": 66, "Billy": "Blue"})) + assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{}))) + assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Asher": "Roth"}))) + assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Route": 66}))) + assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Route": 66, "Billy": "Blue"}))) } diff --git a/pubsub/query/query.go b/pubsub/query/query.go index 56f2829d..84c3aa18 100644 --- a/pubsub/query/query.go +++ b/pubsub/query/query.go @@ -14,6 +14,8 @@ import ( "strconv" "strings" "time" + + "github.com/tendermint/tmlibs/pubsub" ) // Query holds the query string and the query parser. @@ -145,8 +147,8 @@ func (q *Query) Conditions() []Condition { // // For example, query "name=John" matches tags = {"name": "John"}. More // examples could be found in parser_test.go and query_test.go. -func (q *Query) Matches(tags map[string]interface{}) bool { - if len(tags) == 0 { +func (q *Query) Matches(tags pubsub.TagMap) bool { + if tags.Len() == 0 { return false } @@ -231,9 +233,9 @@ func (q *Query) Matches(tags map[string]interface{}) bool { // value from it to the operand using the operator. // // "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" } -func match(tag string, op Operator, operand reflect.Value, tags map[string]interface{}) bool { +func match(tag string, op Operator, operand reflect.Value, tags pubsub.TagMap) bool { // look up the tag from the query in tags - value, ok := tags[tag] + value, ok := tags.Get(tag) if !ok { return false } diff --git a/pubsub/query/query_test.go b/pubsub/query/query_test.go index b980a79c..7d3ac6ba 100644 --- a/pubsub/query/query_test.go +++ b/pubsub/query/query_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/tendermint/tmlibs/pubsub" "github.com/tendermint/tmlibs/pubsub/query" ) @@ -51,9 +52,9 @@ func TestMatches(t *testing.T) { } if tc.matches { - assert.True(t, q.Matches(tc.tags), "Query '%s' should match %v", tc.s, tc.tags) + assert.True(t, q.Matches(pubsub.NewTagMap(tc.tags)), "Query '%s' should match %v", tc.s, tc.tags) } else { - assert.False(t, q.Matches(tc.tags), "Query '%s' should not match %v", tc.s, tc.tags) + assert.False(t, q.Matches(pubsub.NewTagMap(tc.tags)), "Query '%s' should not match %v", tc.s, tc.tags) } } } diff --git a/version/version.go b/version/version.go index b389a63a..40472c9a 100644 --- a/version/version.go +++ b/version/version.go @@ -1,3 +1,3 @@ package version -const Version = "0.8.1" +const Version = "0.8.3"