expect all tags to be strings (#1498)

* expect all tags to be strings

Refs #1369

* port changes from https://github.com/tendermint/tmlibs/pull/204

Refs #1369
This commit is contained in:
Anton Kaliaev 2018-05-28 14:37:11 +04:00 committed by GitHub
parent 7f20eb5f8e
commit 6004587347
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 108 additions and 105 deletions

View File

@ -1,5 +1,12 @@
# Changelog # Changelog
## 0.20.0
BREAKING:
- [libs/pubsub] TagMap#Get returns a string value
- [libs/pubsub] NewTagMap accepts a map of strings
## 0.19.6 ## 0.19.6
FEATURES FEATURES

View File

@ -22,7 +22,7 @@ func TestExample(t *testing.T) {
ch := make(chan interface{}, 1) ch := make(chan interface{}, 1)
err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"), ch) err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"), ch)
require.NoError(t, err) require.NoError(t, err)
err = s.PublishWithTags(ctx, "Tombstone", pubsub.NewTagMap(map[string]interface{}{"abci.account.name": "John"})) err = s.PublishWithTags(ctx, "Tombstone", pubsub.NewTagMap(map[string]string{"abci.account.name": "John"}))
require.NoError(t, err) require.NoError(t, err)
assertReceive(t, "Tombstone", ch) assertReceive(t, "Tombstone", ch)
} }

View File

@ -38,18 +38,6 @@ var (
ErrAlreadySubscribed = errors.New("already subscribed") ErrAlreadySubscribed = errors.New("already subscribed")
) )
// TagMap is used to associate tags to a message.
// They can be queried by subscribers to choose messages they will received.
type TagMap interface {
// Get returns the value for a key, or nil if no value is present.
// The ok result indicates whether value was found in the tags.
Get(key string) (value interface{}, ok bool)
// Len returns the number of tags.
Len() int
}
type tagMap map[string]interface{}
type cmd struct { type cmd struct {
op operation op operation
query Query query Query
@ -80,14 +68,28 @@ type Server struct {
// Option sets a parameter for the server. // Option sets a parameter for the server.
type Option func(*Server) type Option func(*Server)
// TagMap is used to associate tags to a message.
// They can be queried by subscribers to choose messages they will received.
type TagMap interface {
// Get returns the value for a key, or nil if no value is present.
// The ok result indicates whether value was found in the tags.
Get(key string) (value string, ok bool)
// Len returns the number of tags.
Len() int
}
type tagMap map[string]string
var _ TagMap = (*tagMap)(nil)
// NewTagMap constructs a new immutable tag set from a map. // NewTagMap constructs a new immutable tag set from a map.
func NewTagMap(data map[string]interface{}) TagMap { func NewTagMap(data map[string]string) TagMap {
return tagMap(data) return tagMap(data)
} }
// Get returns the value for a key, or nil if no value is present. // Get returns the value for a key, or nil if no value is present.
// The ok result indicates whether value was found in the tags. // The ok result indicates whether value was found in the tags.
func (ts tagMap) Get(key string) (value interface{}, ok bool) { func (ts tagMap) Get(key string) (value string, ok bool) {
value, ok = ts[key] value, ok = ts[key]
return return
} }
@ -213,7 +215,7 @@ func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
// Publish publishes the given message. An error will be returned to the caller // Publish publishes the given message. An error will be returned to the caller
// if the context is canceled. // if the context is canceled.
func (s *Server) Publish(ctx context.Context, msg interface{}) error { func (s *Server) Publish(ctx context.Context, msg interface{}) error {
return s.PublishWithTags(ctx, msg, NewTagMap(make(map[string]interface{}))) return s.PublishWithTags(ctx, msg, NewTagMap(make(map[string]string)))
} }
// PublishWithTags publishes the given message with the set of tags. The set is // PublishWithTags publishes the given message with the set of tags. The set is

View File

@ -49,14 +49,14 @@ func TestDifferentClients(t *testing.T) {
ch1 := make(chan interface{}, 1) ch1 := make(chan interface{}, 1)
err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"), ch1) err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"), ch1)
require.NoError(t, err) require.NoError(t, err)
err = s.PublishWithTags(ctx, "Iceman", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"})) err = s.PublishWithTags(ctx, "Iceman", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"}))
require.NoError(t, err) require.NoError(t, err)
assertReceive(t, "Iceman", ch1) assertReceive(t, "Iceman", ch1)
ch2 := make(chan interface{}, 1) ch2 := make(chan interface{}, 1)
err = s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"), ch2) err = s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"), ch2)
require.NoError(t, err) require.NoError(t, err)
err = s.PublishWithTags(ctx, "Ultimo", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"})) err = s.PublishWithTags(ctx, "Ultimo", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock", "abci.account.name": "Igor"}))
require.NoError(t, err) require.NoError(t, err)
assertReceive(t, "Ultimo", ch1) assertReceive(t, "Ultimo", ch1)
assertReceive(t, "Ultimo", ch2) assertReceive(t, "Ultimo", ch2)
@ -64,7 +64,7 @@ func TestDifferentClients(t *testing.T) {
ch3 := make(chan interface{}, 1) ch3 := make(chan interface{}, 1)
err = s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"), ch3) err = s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"), ch3)
require.NoError(t, err) require.NoError(t, err)
err = s.PublishWithTags(ctx, "Valeria Richards", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewRoundStep"})) err = s.PublishWithTags(ctx, "Valeria Richards", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewRoundStep"}))
require.NoError(t, err) require.NoError(t, err)
assert.Zero(t, len(ch3)) assert.Zero(t, len(ch3))
} }
@ -81,7 +81,7 @@ func TestClientSubscribesTwice(t *testing.T) {
ch1 := make(chan interface{}, 1) ch1 := make(chan interface{}, 1)
err := s.Subscribe(ctx, clientID, q, ch1) err := s.Subscribe(ctx, clientID, q, ch1)
require.NoError(t, err) require.NoError(t, err)
err = s.PublishWithTags(ctx, "Goblin Queen", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"})) err = s.PublishWithTags(ctx, "Goblin Queen", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"}))
require.NoError(t, err) require.NoError(t, err)
assertReceive(t, "Goblin Queen", ch1) assertReceive(t, "Goblin Queen", ch1)
@ -89,7 +89,7 @@ func TestClientSubscribesTwice(t *testing.T) {
err = s.Subscribe(ctx, clientID, q, ch2) err = s.Subscribe(ctx, clientID, q, ch2)
require.Error(t, err) require.Error(t, err)
err = s.PublishWithTags(ctx, "Spider-Man", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"})) err = s.PublishWithTags(ctx, "Spider-Man", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"}))
require.NoError(t, err) require.NoError(t, err)
assertReceive(t, "Spider-Man", ch1) assertReceive(t, "Spider-Man", ch1)
} }
@ -209,7 +209,7 @@ func benchmarkNClients(n int, b *testing.B) {
b.ReportAllocs() b.ReportAllocs()
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": i})) s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]string{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": string(i)}))
} }
} }
@ -232,7 +232,7 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) {
b.ReportAllocs() b.ReportAllocs()
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": 1})) s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]string{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": "1"}))
} }
} }

View File

@ -11,8 +11,8 @@ import (
func TestEmptyQueryMatchesAnything(t *testing.T) { func TestEmptyQueryMatchesAnything(t *testing.T) {
q := query.Empty{} q := query.Empty{}
assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{}))) assert.True(t, q.Matches(pubsub.NewTagMap(map[string]string{})))
assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Asher": "Roth"}))) assert.True(t, q.Matches(pubsub.NewTagMap(map[string]string{"Asher": "Roth"})))
assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Route": 66}))) assert.True(t, q.Matches(pubsub.NewTagMap(map[string]string{"Route": "66"})))
assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Route": 66, "Billy": "Blue"}))) assert.True(t, q.Matches(pubsub.NewTagMap(map[string]string{"Route": "66", "Billy": "Blue"})))
} }

View File

@ -77,6 +77,13 @@ const (
OpContains OpContains
) )
const (
// DateLayout defines a layout for all dates (`DATE date`)
DateLayout = "2006-01-02"
// TimeLayout defines a layout for all times (`TIME time`)
TimeLayout = time.RFC3339
)
// Conditions returns a list of conditions. // Conditions returns a list of conditions.
func (q *Query) Conditions() []Condition { func (q *Query) Conditions() []Condition {
conditions := make([]Condition, 0) conditions := make([]Condition, 0)
@ -112,7 +119,7 @@ func (q *Query) Conditions() []Condition {
conditions = append(conditions, Condition{tag, op, valueWithoutSingleQuotes}) conditions = append(conditions, Condition{tag, op, valueWithoutSingleQuotes})
case rulenumber: case rulenumber:
number := buffer[begin:end] number := buffer[begin:end]
if strings.Contains(number, ".") { // if it looks like a floating-point number if strings.ContainsAny(number, ".") { // if it looks like a floating-point number
value, err := strconv.ParseFloat(number, 64) value, err := strconv.ParseFloat(number, 64)
if err != nil { if err != nil {
panic(fmt.Sprintf("got %v while trying to parse %s as float64 (should never happen if the grammar is correct)", err, number)) panic(fmt.Sprintf("got %v while trying to parse %s as float64 (should never happen if the grammar is correct)", err, number))
@ -126,7 +133,7 @@ func (q *Query) Conditions() []Condition {
conditions = append(conditions, Condition{tag, op, value}) conditions = append(conditions, Condition{tag, op, value})
} }
case ruletime: case ruletime:
value, err := time.Parse(time.RFC3339, buffer[begin:end]) value, err := time.Parse(TimeLayout, buffer[begin:end])
if err != nil { if err != nil {
panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)", err, buffer[begin:end])) panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)", err, buffer[begin:end]))
} }
@ -188,7 +195,7 @@ func (q *Query) Matches(tags pubsub.TagMap) bool {
} }
case rulenumber: case rulenumber:
number := buffer[begin:end] number := buffer[begin:end]
if strings.Contains(number, ".") { // if it looks like a floating-point number if strings.ContainsAny(number, ".") { // if it looks like a floating-point number
value, err := strconv.ParseFloat(number, 64) value, err := strconv.ParseFloat(number, 64)
if err != nil { if err != nil {
panic(fmt.Sprintf("got %v while trying to parse %s as float64 (should never happen if the grammar is correct)", err, number)) panic(fmt.Sprintf("got %v while trying to parse %s as float64 (should never happen if the grammar is correct)", err, number))
@ -206,7 +213,7 @@ func (q *Query) Matches(tags pubsub.TagMap) bool {
} }
} }
case ruletime: case ruletime:
value, err := time.Parse(time.RFC3339, buffer[begin:end]) value, err := time.Parse(TimeLayout, buffer[begin:end])
if err != nil { if err != nil {
panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)", err, buffer[begin:end])) panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)", err, buffer[begin:end]))
} }
@ -242,9 +249,18 @@ func match(tag string, op Operator, operand reflect.Value, tags pubsub.TagMap) b
switch operand.Kind() { switch operand.Kind() {
case reflect.Struct: // time case reflect.Struct: // time
operandAsTime := operand.Interface().(time.Time) operandAsTime := operand.Interface().(time.Time)
v, ok := value.(time.Time) // try our best to convert value from tags to time.Time
if !ok { // if value from tags is not time.Time var (
return false v time.Time
err error
)
if strings.ContainsAny(value, "T") {
v, err = time.Parse(TimeLayout, value)
} else {
v, err = time.Parse(DateLayout, value)
}
if err != nil {
panic(fmt.Sprintf("Failed to convert value %v from tag to time.Time: %v", value, err))
} }
switch op { switch op {
case OpLessEqual: case OpLessEqual:
@ -262,23 +278,9 @@ func match(tag string, op Operator, operand reflect.Value, tags pubsub.TagMap) b
operandFloat64 := operand.Interface().(float64) operandFloat64 := operand.Interface().(float64)
var v float64 var v float64
// try our best to convert value from tags to float64 // try our best to convert value from tags to float64
switch vt := value.(type) { v, err := strconv.ParseFloat(value, 64)
case float64: if err != nil {
v = vt panic(fmt.Sprintf("Failed to convert value %v from tag to float64: %v", value, err))
case float32:
v = float64(vt)
case int:
v = float64(vt)
case int8:
v = float64(vt)
case int16:
v = float64(vt)
case int32:
v = float64(vt)
case int64:
v = float64(vt)
default: // fail for all other types
panic(fmt.Sprintf("Incomparable types: %T (%v) vs float64 (%v)", value, value, operandFloat64))
} }
switch op { switch op {
case OpLessEqual: case OpLessEqual:
@ -295,24 +297,20 @@ func match(tag string, op Operator, operand reflect.Value, tags pubsub.TagMap) b
case reflect.Int64: case reflect.Int64:
operandInt := operand.Interface().(int64) operandInt := operand.Interface().(int64)
var v int64 var v int64
// try our best to convert value from tags to int64 // if value looks like float, we try to parse it as float
switch vt := value.(type) { if strings.ContainsAny(value, ".") {
case int64: v1, err := strconv.ParseFloat(value, 64)
v = vt if err != nil {
case int8: panic(fmt.Sprintf("Failed to convert value %v from tag to float64: %v", value, err))
v = int64(vt) }
case int16: v = int64(v1)
v = int64(vt) } else {
case int32: var err error
v = int64(vt) // try our best to convert value from tags to int64
case int: v, err = strconv.ParseInt(value, 10, 64)
v = int64(vt) if err != nil {
case float64: panic(fmt.Sprintf("Failed to convert value %v from tag to int64: %v", value, err))
v = int64(vt) }
case float32:
v = int64(vt)
default: // fail for all other types
panic(fmt.Sprintf("Incomparable types: %T (%v) vs int64 (%v)", value, value, operandInt))
} }
switch op { switch op {
case OpLessEqual: case OpLessEqual:
@ -327,15 +325,11 @@ func match(tag string, op Operator, operand reflect.Value, tags pubsub.TagMap) b
return v == operandInt return v == operandInt
} }
case reflect.String: case reflect.String:
v, ok := value.(string)
if !ok { // if value from tags is not string
return false
}
switch op { switch op {
case OpEqual: case OpEqual:
return v == operand.String() return value == operand.String()
case OpContains: case OpContains:
return strings.Contains(v, operand.String()) return strings.Contains(value, operand.String())
} }
default: default:
panic(fmt.Sprintf("Unknown kind of operand %v", operand.Kind())) panic(fmt.Sprintf("Unknown kind of operand %v", operand.Kind()))

View File

@ -1,6 +1,7 @@
package query_test package query_test
import ( import (
"fmt"
"testing" "testing"
"time" "time"
@ -12,38 +13,37 @@ import (
) )
func TestMatches(t *testing.T) { func TestMatches(t *testing.T) {
const shortForm = "2006-Jan-02" var (
txDate, err := time.Parse(shortForm, "2017-Jan-01") txDate = "2017-01-01"
require.NoError(t, err) txTime = "2018-05-03T14:45:00Z"
txTime, err := time.Parse(time.RFC3339, "2018-05-03T14:45:00Z") )
require.NoError(t, err)
testCases := []struct { testCases := []struct {
s string s string
tags map[string]interface{} tags map[string]string
err bool err bool
matches bool matches bool
}{ }{
{"tm.events.type='NewBlock'", map[string]interface{}{"tm.events.type": "NewBlock"}, false, true}, {"tm.events.type='NewBlock'", map[string]string{"tm.events.type": "NewBlock"}, false, true},
{"tx.gas > 7", map[string]interface{}{"tx.gas": 8}, false, true}, {"tx.gas > 7", map[string]string{"tx.gas": "8"}, false, true},
{"tx.gas > 7 AND tx.gas < 9", map[string]interface{}{"tx.gas": 8}, false, true}, {"tx.gas > 7 AND tx.gas < 9", map[string]string{"tx.gas": "8"}, false, true},
{"body.weight >= 3.5", map[string]interface{}{"body.weight": 3.5}, false, true}, {"body.weight >= 3.5", map[string]string{"body.weight": "3.5"}, false, true},
{"account.balance < 1000.0", map[string]interface{}{"account.balance": 900}, false, true}, {"account.balance < 1000.0", map[string]string{"account.balance": "900"}, false, true},
{"apples.kg <= 4", map[string]interface{}{"apples.kg": 4.0}, false, true}, {"apples.kg <= 4", map[string]string{"apples.kg": "4.0"}, false, true},
{"body.weight >= 4.5", map[string]interface{}{"body.weight": float32(4.5)}, false, true}, {"body.weight >= 4.5", map[string]string{"body.weight": fmt.Sprintf("%v", float32(4.5))}, false, true},
{"oranges.kg < 4 AND watermellons.kg > 10", map[string]interface{}{"oranges.kg": 3, "watermellons.kg": 12}, false, true}, {"oranges.kg < 4 AND watermellons.kg > 10", map[string]string{"oranges.kg": "3", "watermellons.kg": "12"}, false, true},
{"peaches.kg < 4", map[string]interface{}{"peaches.kg": 5}, false, false}, {"peaches.kg < 4", map[string]string{"peaches.kg": "5"}, false, false},
{"tx.date > DATE 2017-01-01", map[string]interface{}{"tx.date": time.Now()}, false, true}, {"tx.date > DATE 2017-01-01", map[string]string{"tx.date": time.Now().Format(query.DateLayout)}, false, true},
{"tx.date = DATE 2017-01-01", map[string]interface{}{"tx.date": txDate}, false, true}, {"tx.date = DATE 2017-01-01", map[string]string{"tx.date": txDate}, false, true},
{"tx.date = DATE 2018-01-01", map[string]interface{}{"tx.date": txDate}, false, false}, {"tx.date = DATE 2018-01-01", map[string]string{"tx.date": txDate}, false, false},
{"tx.time >= TIME 2013-05-03T14:45:00Z", map[string]interface{}{"tx.time": time.Now()}, false, true}, {"tx.time >= TIME 2013-05-03T14:45:00Z", map[string]string{"tx.time": time.Now().Format(query.TimeLayout)}, false, true},
{"tx.time = TIME 2013-05-03T14:45:00Z", map[string]interface{}{"tx.time": txTime}, false, false}, {"tx.time = TIME 2013-05-03T14:45:00Z", map[string]string{"tx.time": txTime}, false, false},
{"abci.owner.name CONTAINS 'Igor'", map[string]interface{}{"abci.owner.name": "Igor,Ivan"}, false, true}, {"abci.owner.name CONTAINS 'Igor'", map[string]string{"abci.owner.name": "Igor,Ivan"}, false, true},
{"abci.owner.name CONTAINS 'Igor'", map[string]interface{}{"abci.owner.name": "Pavel,Ivan"}, false, false}, {"abci.owner.name CONTAINS 'Igor'", map[string]string{"abci.owner.name": "Pavel,Ivan"}, false, false},
} }
for _, tc := range testCases { for _, tc := range testCases {

View File

@ -8,7 +8,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
abci "github.com/tendermint/abci/types" abci "github.com/tendermint/abci/types"
"github.com/tendermint/go-crypto" crypto "github.com/tendermint/go-crypto"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
dbm "github.com/tendermint/tmlibs/db" dbm "github.com/tendermint/tmlibs/db"
@ -121,7 +121,7 @@ func TestABCIResponsesSaveLoad2(t *testing.T) {
{Code: 383}, {Code: 383},
{Data: []byte("Gotcha!"), {Data: []byte("Gotcha!"),
Tags: []cmn.KVPair{ Tags: []cmn.KVPair{
cmn.KVPair{[]byte("a"), []byte{1}}, cmn.KVPair{[]byte("a"), []byte("1")},
cmn.KVPair{[]byte("build"), []byte("stuff")}, cmn.KVPair{[]byte("build"), []byte("stuff")},
}}, }},
}, },

View File

@ -67,7 +67,7 @@ func (b *EventBus) UnsubscribeAll(ctx context.Context, subscriber string) error
func (b *EventBus) Publish(eventType string, eventData TMEventData) error { func (b *EventBus) Publish(eventType string, eventData TMEventData) error {
// no explicit deadline for publishing events // no explicit deadline for publishing events
ctx := context.Background() ctx := context.Background()
b.pubsub.PublishWithTags(ctx, eventData, tmpubsub.NewTagMap(map[string]interface{}{EventTypeKey: eventType})) b.pubsub.PublishWithTags(ctx, eventData, tmpubsub.NewTagMap(map[string]string{EventTypeKey: eventType}))
return nil return nil
} }
@ -92,7 +92,7 @@ func (b *EventBus) PublishEventTx(event EventDataTx) error {
// no explicit deadline for publishing events // no explicit deadline for publishing events
ctx := context.Background() ctx := context.Background()
tags := make(map[string]interface{}) tags := make(map[string]string)
// validate and fill tags from tx result // validate and fill tags from tx result
for _, tag := range event.Result.Tags { for _, tag := range event.Result.Tags {
@ -112,7 +112,7 @@ func (b *EventBus) PublishEventTx(event EventDataTx) error {
tags[TxHashKey] = fmt.Sprintf("%X", event.Tx.Hash()) tags[TxHashKey] = fmt.Sprintf("%X", event.Tx.Hash())
logIfTagExists(TxHeightKey, tags, b.Logger) logIfTagExists(TxHeightKey, tags, b.Logger)
tags[TxHeightKey] = event.Height tags[TxHeightKey] = fmt.Sprintf("%d", event.Height)
b.pubsub.PublishWithTags(ctx, event, tmpubsub.NewTagMap(tags)) b.pubsub.PublishWithTags(ctx, event, tmpubsub.NewTagMap(tags))
return nil return nil
@ -160,7 +160,7 @@ func (b *EventBus) PublishEventLock(event EventDataRoundState) error {
return b.Publish(EventLock, event) return b.Publish(EventLock, event)
} }
func logIfTagExists(tag string, tags map[string]interface{}, logger log.Logger) { func logIfTagExists(tag string, tags map[string]string, logger log.Logger) {
if value, ok := tags[tag]; ok { if value, ok := tags[tag]; ok {
logger.Error("Found predefined tag (value will be overwritten)", "tag", tag, "value", value) logger.Error("Found predefined tag (value will be overwritten)", "tag", tag, "value", value)
} }

View File

@ -23,12 +23,12 @@ func TestEventBusPublishEventTx(t *testing.T) {
defer eventBus.Stop() defer eventBus.Stop()
tx := Tx("foo") tx := Tx("foo")
result := abci.ResponseDeliverTx{Data: []byte("bar"), Tags: []cmn.KVPair{}, Fee: cmn.KI64Pair{Key: []uint8{}, Value: 0}} result := abci.ResponseDeliverTx{Data: []byte("bar"), Tags: []cmn.KVPair{{[]byte("baz"), []byte("1")}}, Fee: cmn.KI64Pair{Key: []uint8{}, Value: 0}}
txEventsCh := make(chan interface{}) txEventsCh := make(chan interface{})
// PublishEventTx adds all these 3 tags, so the query below should work // PublishEventTx adds all these 3 tags, so the query below should work
query := fmt.Sprintf("tm.event='Tx' AND tx.height=1 AND tx.hash='%X'", tx.Hash()) query := fmt.Sprintf("tm.event='Tx' AND tx.height=1 AND tx.hash='%X' AND baz=1", tx.Hash())
err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh) err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh)
require.NoError(t, err) require.NoError(t, err)