diff --git a/events/Makefile b/events/Makefile deleted file mode 100644 index c425ee5a..00000000 --- a/events/Makefile +++ /dev/null @@ -1,9 +0,0 @@ -.PHONY: docs -REPO:=github.com/tendermint/tmlibs/events - -docs: - @go get github.com/davecheney/godoc2md - godoc2md $(REPO) > README.md - -test: - go test -v ./... diff --git a/events/README.md b/events/README.md deleted file mode 100644 index d7469515..00000000 --- a/events/README.md +++ /dev/null @@ -1,175 +0,0 @@ - - -# events -`import "github.com/tendermint/tmlibs/events"` - -* [Overview](#pkg-overview) -* [Index](#pkg-index) - -## Overview -Pub-Sub in go with event caching - - - - -## Index -* [type EventCache](#EventCache) - * [func NewEventCache(evsw Fireable) *EventCache](#NewEventCache) - * [func (evc *EventCache) FireEvent(event string, data EventData)](#EventCache.FireEvent) - * [func (evc *EventCache) Flush()](#EventCache.Flush) -* [type EventCallback](#EventCallback) -* [type EventData](#EventData) -* [type EventSwitch](#EventSwitch) - * [func NewEventSwitch() EventSwitch](#NewEventSwitch) -* [type Eventable](#Eventable) -* [type Fireable](#Fireable) - - -#### Package files -[event_cache.go](/src/github.com/tendermint/tmlibs/events/event_cache.go) [events.go](/src/github.com/tendermint/tmlibs/events/events.go) [log.go](/src/github.com/tendermint/tmlibs/events/log.go) - - - - - - -## type [EventCache](/src/target/event_cache.go?s=152:215#L1) -``` go -type EventCache struct { - // contains filtered or unexported fields -} -``` -An EventCache buffers events for a Fireable -All events are cached. Filtering happens on Flush - - - - - - - -### func [NewEventCache](/src/target/event_cache.go?s=275:320#L5) -``` go -func NewEventCache(evsw Fireable) *EventCache -``` -Create a new EventCache with an EventSwitch as backend - - - - - -### func (\*EventCache) [FireEvent](/src/target/event_cache.go?s=534:596#L19) -``` go -func (evc *EventCache) FireEvent(event string, data EventData) -``` -Cache an event to be fired upon finality. - - - - -### func (\*EventCache) [Flush](/src/target/event_cache.go?s=773:803#L26) -``` go -func (evc *EventCache) Flush() -``` -Fire events by running evsw.FireEvent on all cached events. Blocks. -Clears cached events - - - - -## type [EventCallback](/src/target/events.go?s=4182:4221#L175) -``` go -type EventCallback func(data EventData) -``` - - - - - - - - - -## type [EventData](/src/target/events.go?s=236:287#L4) -``` go -type EventData interface { -} -``` -Generic event data can be typed and registered with tendermint/go-amino -via concrete implementation of this interface - - - - - - - - - - -## type [EventSwitch](/src/target/events.go?s=553:760#L19) -``` go -type EventSwitch interface { - Service - Fireable - - AddListenerForEvent(listenerID, event string, cb EventCallback) - RemoveListenerForEvent(event string, listenerID string) - RemoveListener(listenerID string) -} -``` - - - - - - -### func [NewEventSwitch](/src/target/events.go?s=902:935#L36) -``` go -func NewEventSwitch() EventSwitch -``` - - - - -## type [Eventable](/src/target/events.go?s=371:433#L10) -``` go -type Eventable interface { - SetEventSwitch(evsw EventSwitch) -} -``` -reactors and other modules should export -this interface to become eventable - - - - - - - - - - -## type [Fireable](/src/target/events.go?s=483:551#L15) -``` go -type Fireable interface { - FireEvent(event string, data EventData) -} -``` -an event switch or cache implements fireable - - - - - - - - - - - - - - -- - - -Generated by [godoc2md](http://godoc.org/github.com/davecheney/godoc2md) diff --git a/events/event_cache.go b/events/event_cache.go deleted file mode 100644 index f508e873..00000000 --- a/events/event_cache.go +++ /dev/null @@ -1,37 +0,0 @@ -package events - -// An EventCache buffers events for a Fireable -// All events are cached. Filtering happens on Flush -type EventCache struct { - evsw Fireable - events []eventInfo -} - -// Create a new EventCache with an EventSwitch as backend -func NewEventCache(evsw Fireable) *EventCache { - return &EventCache{ - evsw: evsw, - } -} - -// a cached event -type eventInfo struct { - event string - data EventData -} - -// Cache an event to be fired upon finality. -func (evc *EventCache) FireEvent(event string, data EventData) { - // append to list (go will grow our backing array exponentially) - evc.events = append(evc.events, eventInfo{event, data}) -} - -// Fire events by running evsw.FireEvent on all cached events. Blocks. -// Clears cached events -func (evc *EventCache) Flush() { - for _, ei := range evc.events { - evc.evsw.FireEvent(ei.event, ei.data) - } - // Clear the buffer, since we only add to it with append it's safe to just set it to nil and maybe safe an allocation - evc.events = nil -} diff --git a/events/event_cache_test.go b/events/event_cache_test.go deleted file mode 100644 index ab321da3..00000000 --- a/events/event_cache_test.go +++ /dev/null @@ -1,35 +0,0 @@ -package events - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestEventCache_Flush(t *testing.T) { - evsw := NewEventSwitch() - evsw.Start() - evsw.AddListenerForEvent("nothingness", "", func(data EventData) { - // Check we are not initialising an empty buffer full of zeroed eventInfos in the EventCache - require.FailNow(t, "We should never receive a message on this switch since none are fired") - }) - evc := NewEventCache(evsw) - evc.Flush() - // Check after reset - evc.Flush() - fail := true - pass := false - evsw.AddListenerForEvent("somethingness", "something", func(data EventData) { - if fail { - require.FailNow(t, "Shouldn't see a message until flushed") - } - pass = true - }) - evc.FireEvent("something", struct{ int }{1}) - evc.FireEvent("something", struct{ int }{2}) - evc.FireEvent("something", struct{ int }{3}) - fail = false - evc.Flush() - assert.True(t, pass) -} diff --git a/events/events.go b/events/events.go deleted file mode 100644 index f1b2a754..00000000 --- a/events/events.go +++ /dev/null @@ -1,226 +0,0 @@ -/* -Pub-Sub in go with event caching -*/ -package events - -import ( - "sync" - - cmn "github.com/tendermint/tmlibs/common" -) - -// Generic event data can be typed and registered with tendermint/go-amino -// via concrete implementation of this interface -type EventData interface { - //AssertIsEventData() -} - -// reactors and other modules should export -// this interface to become eventable -type Eventable interface { - SetEventSwitch(evsw EventSwitch) -} - -// an event switch or cache implements fireable -type Fireable interface { - FireEvent(event string, data EventData) -} - -type EventSwitch interface { - cmn.Service - Fireable - - AddListenerForEvent(listenerID, event string, cb EventCallback) - RemoveListenerForEvent(event string, listenerID string) - RemoveListener(listenerID string) -} - -type eventSwitch struct { - cmn.BaseService - - mtx sync.RWMutex - eventCells map[string]*eventCell - listeners map[string]*eventListener -} - -func NewEventSwitch() EventSwitch { - evsw := &eventSwitch{} - evsw.BaseService = *cmn.NewBaseService(nil, "EventSwitch", evsw) - return evsw -} - -func (evsw *eventSwitch) OnStart() error { - evsw.BaseService.OnStart() - evsw.eventCells = make(map[string]*eventCell) - evsw.listeners = make(map[string]*eventListener) - return nil -} - -func (evsw *eventSwitch) OnStop() { - evsw.mtx.Lock() - defer evsw.mtx.Unlock() - evsw.BaseService.OnStop() - evsw.eventCells = nil - evsw.listeners = nil -} - -func (evsw *eventSwitch) AddListenerForEvent(listenerID, event string, cb EventCallback) { - // Get/Create eventCell and listener - evsw.mtx.Lock() - eventCell := evsw.eventCells[event] - if eventCell == nil { - eventCell = newEventCell() - evsw.eventCells[event] = eventCell - } - listener := evsw.listeners[listenerID] - if listener == nil { - listener = newEventListener(listenerID) - evsw.listeners[listenerID] = listener - } - evsw.mtx.Unlock() - - // Add event and listener - eventCell.AddListener(listenerID, cb) - listener.AddEvent(event) -} - -func (evsw *eventSwitch) RemoveListener(listenerID string) { - // Get and remove listener - evsw.mtx.RLock() - listener := evsw.listeners[listenerID] - evsw.mtx.RUnlock() - if listener == nil { - return - } - - evsw.mtx.Lock() - delete(evsw.listeners, listenerID) - evsw.mtx.Unlock() - - // Remove callback for each event. - listener.SetRemoved() - for _, event := range listener.GetEvents() { - evsw.RemoveListenerForEvent(event, listenerID) - } -} - -func (evsw *eventSwitch) RemoveListenerForEvent(event string, listenerID string) { - // Get eventCell - evsw.mtx.Lock() - eventCell := evsw.eventCells[event] - evsw.mtx.Unlock() - - if eventCell == nil { - return - } - - // Remove listenerID from eventCell - numListeners := eventCell.RemoveListener(listenerID) - - // Maybe garbage collect eventCell. - if numListeners == 0 { - // Lock again and double check. - evsw.mtx.Lock() // OUTER LOCK - eventCell.mtx.Lock() // INNER LOCK - if len(eventCell.listeners) == 0 { - delete(evsw.eventCells, event) - } - eventCell.mtx.Unlock() // INNER LOCK - evsw.mtx.Unlock() // OUTER LOCK - } -} - -func (evsw *eventSwitch) FireEvent(event string, data EventData) { - // Get the eventCell - evsw.mtx.RLock() - eventCell := evsw.eventCells[event] - evsw.mtx.RUnlock() - - if eventCell == nil { - return - } - - // Fire event for all listeners in eventCell - eventCell.FireEvent(data) -} - -//----------------------------------------------------------------------------- - -// eventCell handles keeping track of listener callbacks for a given event. -type eventCell struct { - mtx sync.RWMutex - listeners map[string]EventCallback -} - -func newEventCell() *eventCell { - return &eventCell{ - listeners: make(map[string]EventCallback), - } -} - -func (cell *eventCell) AddListener(listenerID string, cb EventCallback) { - cell.mtx.Lock() - cell.listeners[listenerID] = cb - cell.mtx.Unlock() -} - -func (cell *eventCell) RemoveListener(listenerID string) int { - cell.mtx.Lock() - delete(cell.listeners, listenerID) - numListeners := len(cell.listeners) - cell.mtx.Unlock() - return numListeners -} - -func (cell *eventCell) FireEvent(data EventData) { - cell.mtx.RLock() - for _, listener := range cell.listeners { - listener(data) - } - cell.mtx.RUnlock() -} - -//----------------------------------------------------------------------------- - -type EventCallback func(data EventData) - -type eventListener struct { - id string - - mtx sync.RWMutex - removed bool - events []string -} - -func newEventListener(id string) *eventListener { - return &eventListener{ - id: id, - removed: false, - events: nil, - } -} - -func (evl *eventListener) AddEvent(event string) { - evl.mtx.Lock() - defer evl.mtx.Unlock() - - if evl.removed { - return - } - evl.events = append(evl.events, event) -} - -func (evl *eventListener) GetEvents() []string { - evl.mtx.RLock() - defer evl.mtx.RUnlock() - - events := make([]string, len(evl.events)) - copy(events, evl.events) - return events -} - -func (evl *eventListener) SetRemoved() { - evl.mtx.Lock() - defer evl.mtx.Unlock() - evl.removed = true -} diff --git a/events/events_test.go b/events/events_test.go deleted file mode 100644 index 4995ae73..00000000 --- a/events/events_test.go +++ /dev/null @@ -1,380 +0,0 @@ -package events - -import ( - "fmt" - "math/rand" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -// TestAddListenerForEventFireOnce sets up an EventSwitch, subscribes a single -// listener to an event, and sends a string "data". -func TestAddListenerForEventFireOnce(t *testing.T) { - evsw := NewEventSwitch() - err := evsw.Start() - if err != nil { - t.Errorf("Failed to start EventSwitch, error: %v", err) - } - messages := make(chan EventData) - evsw.AddListenerForEvent("listener", "event", - func(data EventData) { - messages <- data - }) - go evsw.FireEvent("event", "data") - received := <-messages - if received != "data" { - t.Errorf("Message received does not match: %v", received) - } -} - -// TestAddListenerForEventFireMany sets up an EventSwitch, subscribes a single -// listener to an event, and sends a thousand integers. -func TestAddListenerForEventFireMany(t *testing.T) { - evsw := NewEventSwitch() - err := evsw.Start() - if err != nil { - t.Errorf("Failed to start EventSwitch, error: %v", err) - } - doneSum := make(chan uint64) - doneSending := make(chan uint64) - numbers := make(chan uint64, 4) - // subscribe one listener for one event - evsw.AddListenerForEvent("listener", "event", - func(data EventData) { - numbers <- data.(uint64) - }) - // collect received events - go sumReceivedNumbers(numbers, doneSum) - // go fire events - go fireEvents(evsw, "event", doneSending, uint64(1)) - checkSum := <-doneSending - close(numbers) - eventSum := <-doneSum - if checkSum != eventSum { - t.Errorf("Not all messages sent were received.\n") - } -} - -// TestAddListenerForDifferentEvents sets up an EventSwitch, subscribes a single -// listener to three different events and sends a thousand integers for each -// of the three events. -func TestAddListenerForDifferentEvents(t *testing.T) { - evsw := NewEventSwitch() - err := evsw.Start() - if err != nil { - t.Errorf("Failed to start EventSwitch, error: %v", err) - } - doneSum := make(chan uint64) - doneSending1 := make(chan uint64) - doneSending2 := make(chan uint64) - doneSending3 := make(chan uint64) - numbers := make(chan uint64, 4) - // subscribe one listener to three events - evsw.AddListenerForEvent("listener", "event1", - func(data EventData) { - numbers <- data.(uint64) - }) - evsw.AddListenerForEvent("listener", "event2", - func(data EventData) { - numbers <- data.(uint64) - }) - evsw.AddListenerForEvent("listener", "event3", - func(data EventData) { - numbers <- data.(uint64) - }) - // collect received events - go sumReceivedNumbers(numbers, doneSum) - // go fire events - go fireEvents(evsw, "event1", doneSending1, uint64(1)) - go fireEvents(evsw, "event2", doneSending2, uint64(1)) - go fireEvents(evsw, "event3", doneSending3, uint64(1)) - var checkSum uint64 = 0 - checkSum += <-doneSending1 - checkSum += <-doneSending2 - checkSum += <-doneSending3 - close(numbers) - eventSum := <-doneSum - if checkSum != eventSum { - t.Errorf("Not all messages sent were received.\n") - } -} - -// TestAddDifferentListenerForDifferentEvents sets up an EventSwitch, -// subscribes a first listener to three events, and subscribes a second -// listener to two of those three events, and then sends a thousand integers -// for each of the three events. -func TestAddDifferentListenerForDifferentEvents(t *testing.T) { - evsw := NewEventSwitch() - err := evsw.Start() - if err != nil { - t.Errorf("Failed to start EventSwitch, error: %v", err) - } - doneSum1 := make(chan uint64) - doneSum2 := make(chan uint64) - doneSending1 := make(chan uint64) - doneSending2 := make(chan uint64) - doneSending3 := make(chan uint64) - numbers1 := make(chan uint64, 4) - numbers2 := make(chan uint64, 4) - // subscribe two listener to three events - evsw.AddListenerForEvent("listener1", "event1", - func(data EventData) { - numbers1 <- data.(uint64) - }) - evsw.AddListenerForEvent("listener1", "event2", - func(data EventData) { - numbers1 <- data.(uint64) - }) - evsw.AddListenerForEvent("listener1", "event3", - func(data EventData) { - numbers1 <- data.(uint64) - }) - evsw.AddListenerForEvent("listener2", "event2", - func(data EventData) { - numbers2 <- data.(uint64) - }) - evsw.AddListenerForEvent("listener2", "event3", - func(data EventData) { - numbers2 <- data.(uint64) - }) - // collect received events for listener1 - go sumReceivedNumbers(numbers1, doneSum1) - // collect received events for listener2 - go sumReceivedNumbers(numbers2, doneSum2) - // go fire events - go fireEvents(evsw, "event1", doneSending1, uint64(1)) - go fireEvents(evsw, "event2", doneSending2, uint64(1001)) - go fireEvents(evsw, "event3", doneSending3, uint64(2001)) - checkSumEvent1 := <-doneSending1 - checkSumEvent2 := <-doneSending2 - checkSumEvent3 := <-doneSending3 - checkSum1 := checkSumEvent1 + checkSumEvent2 + checkSumEvent3 - checkSum2 := checkSumEvent2 + checkSumEvent3 - close(numbers1) - close(numbers2) - eventSum1 := <-doneSum1 - eventSum2 := <-doneSum2 - if checkSum1 != eventSum1 || - checkSum2 != eventSum2 { - t.Errorf("Not all messages sent were received for different listeners to different events.\n") - } -} - -// TestAddAndRemoveListener sets up an EventSwitch, subscribes a listener to -// two events, fires a thousand integers for the first event, then unsubscribes -// the listener and fires a thousand integers for the second event. -func TestAddAndRemoveListener(t *testing.T) { - evsw := NewEventSwitch() - err := evsw.Start() - if err != nil { - t.Errorf("Failed to start EventSwitch, error: %v", err) - } - doneSum1 := make(chan uint64) - doneSum2 := make(chan uint64) - doneSending1 := make(chan uint64) - doneSending2 := make(chan uint64) - numbers1 := make(chan uint64, 4) - numbers2 := make(chan uint64, 4) - // subscribe two listener to three events - evsw.AddListenerForEvent("listener", "event1", - func(data EventData) { - numbers1 <- data.(uint64) - }) - evsw.AddListenerForEvent("listener", "event2", - func(data EventData) { - numbers2 <- data.(uint64) - }) - // collect received events for event1 - go sumReceivedNumbers(numbers1, doneSum1) - // collect received events for event2 - go sumReceivedNumbers(numbers2, doneSum2) - // go fire events - go fireEvents(evsw, "event1", doneSending1, uint64(1)) - checkSumEvent1 := <-doneSending1 - // after sending all event1, unsubscribe for all events - evsw.RemoveListener("listener") - go fireEvents(evsw, "event2", doneSending2, uint64(1001)) - checkSumEvent2 := <-doneSending2 - close(numbers1) - close(numbers2) - eventSum1 := <-doneSum1 - eventSum2 := <-doneSum2 - if checkSumEvent1 != eventSum1 || - // correct value asserted by preceding tests, suffices to be non-zero - checkSumEvent2 == uint64(0) || - eventSum2 != uint64(0) { - t.Errorf("Not all messages sent were received or unsubscription did not register.\n") - } -} - -// TestRemoveListener does basic tests on adding and removing -func TestRemoveListener(t *testing.T) { - evsw := NewEventSwitch() - err := evsw.Start() - if err != nil { - t.Errorf("Failed to start EventSwitch, error: %v", err) - } - count := 10 - sum1, sum2 := 0, 0 - // add some listeners and make sure they work - evsw.AddListenerForEvent("listener", "event1", - func(data EventData) { - sum1++ - }) - evsw.AddListenerForEvent("listener", "event2", - func(data EventData) { - sum2++ - }) - for i := 0; i < count; i++ { - evsw.FireEvent("event1", true) - evsw.FireEvent("event2", true) - } - assert.Equal(t, count, sum1) - assert.Equal(t, count, sum2) - - // remove one by event and make sure it is gone - evsw.RemoveListenerForEvent("event2", "listener") - for i := 0; i < count; i++ { - evsw.FireEvent("event1", true) - evsw.FireEvent("event2", true) - } - assert.Equal(t, count*2, sum1) - assert.Equal(t, count, sum2) - - // remove the listener entirely and make sure both gone - evsw.RemoveListener("listener") - for i := 0; i < count; i++ { - evsw.FireEvent("event1", true) - evsw.FireEvent("event2", true) - } - assert.Equal(t, count*2, sum1) - assert.Equal(t, count, sum2) -} - -// TestAddAndRemoveListenersAsync sets up an EventSwitch, subscribes two -// listeners to three events, and fires a thousand integers for each event. -// These two listeners serve as the baseline validation while other listeners -// are randomly subscribed and unsubscribed. -// More precisely it randomly subscribes new listeners (different from the first -// two listeners) to one of these three events. At the same time it starts -// randomly unsubscribing these additional listeners from all events they are -// at that point subscribed to. -// NOTE: it is important to run this test with race conditions tracking on, -// `go test -race`, to examine for possible race conditions. -func TestRemoveListenersAsync(t *testing.T) { - evsw := NewEventSwitch() - err := evsw.Start() - if err != nil { - t.Errorf("Failed to start EventSwitch, error: %v", err) - } - doneSum1 := make(chan uint64) - doneSum2 := make(chan uint64) - doneSending1 := make(chan uint64) - doneSending2 := make(chan uint64) - doneSending3 := make(chan uint64) - numbers1 := make(chan uint64, 4) - numbers2 := make(chan uint64, 4) - // subscribe two listener to three events - evsw.AddListenerForEvent("listener1", "event1", - func(data EventData) { - numbers1 <- data.(uint64) - }) - evsw.AddListenerForEvent("listener1", "event2", - func(data EventData) { - numbers1 <- data.(uint64) - }) - evsw.AddListenerForEvent("listener1", "event3", - func(data EventData) { - numbers1 <- data.(uint64) - }) - evsw.AddListenerForEvent("listener2", "event1", - func(data EventData) { - numbers2 <- data.(uint64) - }) - evsw.AddListenerForEvent("listener2", "event2", - func(data EventData) { - numbers2 <- data.(uint64) - }) - evsw.AddListenerForEvent("listener2", "event3", - func(data EventData) { - numbers2 <- data.(uint64) - }) - // collect received events for event1 - go sumReceivedNumbers(numbers1, doneSum1) - // collect received events for event2 - go sumReceivedNumbers(numbers2, doneSum2) - addListenersStress := func() { - s1 := rand.NewSource(time.Now().UnixNano()) - r1 := rand.New(s1) - for k := uint16(0); k < 400; k++ { - listenerNumber := r1.Intn(100) + 3 - eventNumber := r1.Intn(3) + 1 - go evsw.AddListenerForEvent(fmt.Sprintf("listener%v", listenerNumber), - fmt.Sprintf("event%v", eventNumber), - func(_ EventData) {}) - } - } - removeListenersStress := func() { - s2 := rand.NewSource(time.Now().UnixNano()) - r2 := rand.New(s2) - for k := uint16(0); k < 80; k++ { - listenerNumber := r2.Intn(100) + 3 - go evsw.RemoveListener(fmt.Sprintf("listener%v", listenerNumber)) - } - } - addListenersStress() - // go fire events - go fireEvents(evsw, "event1", doneSending1, uint64(1)) - removeListenersStress() - go fireEvents(evsw, "event2", doneSending2, uint64(1001)) - go fireEvents(evsw, "event3", doneSending3, uint64(2001)) - checkSumEvent1 := <-doneSending1 - checkSumEvent2 := <-doneSending2 - checkSumEvent3 := <-doneSending3 - checkSum := checkSumEvent1 + checkSumEvent2 + checkSumEvent3 - close(numbers1) - close(numbers2) - eventSum1 := <-doneSum1 - eventSum2 := <-doneSum2 - if checkSum != eventSum1 || - checkSum != eventSum2 { - t.Errorf("Not all messages sent were received.\n") - } -} - -//------------------------------------------------------------------------------ -// Helper functions - -// sumReceivedNumbers takes two channels and adds all numbers received -// until the receiving channel `numbers` is closed; it then sends the sum -// on `doneSum` and closes that channel. Expected to be run in a go-routine. -func sumReceivedNumbers(numbers, doneSum chan uint64) { - var sum uint64 = 0 - for { - j, more := <-numbers - sum += j - if !more { - doneSum <- sum - close(doneSum) - return - } - } -} - -// fireEvents takes an EventSwitch and fires a thousand integers under -// a given `event` with the integers mootonically increasing from `offset` -// to `offset` + 999. It additionally returns the addition of all integers -// sent on `doneChan` for assertion that all events have been sent, and enabling -// the test to assert all events have also been received. -func fireEvents(evsw EventSwitch, event string, doneChan chan uint64, - offset uint64) { - var sentSum uint64 = 0 - for i := offset; i <= offset+uint64(999); i++ { - sentSum += i - evsw.FireEvent(event, i) - } - doneChan <- sentSum - close(doneChan) -} diff --git a/pubsub/example_test.go b/pubsub/example_test.go deleted file mode 100644 index 71f1b9cd..00000000 --- a/pubsub/example_test.go +++ /dev/null @@ -1,27 +0,0 @@ -package pubsub_test - -import ( - "context" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/tendermint/tmlibs/log" - "github.com/tendermint/tmlibs/pubsub" - "github.com/tendermint/tmlibs/pubsub/query" -) - -func TestExample(t *testing.T) { - s := pubsub.NewServer() - s.SetLogger(log.TestingLogger()) - s.Start() - defer s.Stop() - - ctx := context.Background() - ch := make(chan interface{}, 1) - err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"), ch) - require.NoError(t, err) - err = s.PublishWithTags(ctx, "Tombstone", pubsub.NewTagMap(map[string]interface{}{"abci.account.name": "John"})) - require.NoError(t, err) - assertReceive(t, "Tombstone", ch) -} diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go deleted file mode 100644 index 67f264ac..00000000 --- a/pubsub/pubsub.go +++ /dev/null @@ -1,342 +0,0 @@ -// Package pubsub implements a pub-sub model with a single publisher (Server) -// and multiple subscribers (clients). -// -// Though you can have multiple publishers by sharing a pointer to a server or -// by giving the same channel to each publisher and publishing messages from -// that channel (fan-in). -// -// Clients subscribe for messages, which could be of any type, using a query. -// When some message is published, we match it with all queries. If there is a -// match, this message will be pushed to all clients, subscribed to that query. -// See query subpackage for our implementation. -package pubsub - -import ( - "context" - "errors" - "sync" - - cmn "github.com/tendermint/tmlibs/common" -) - -type operation int - -const ( - sub operation = iota - pub - unsub - shutdown -) - -var ( - // ErrSubscriptionNotFound is returned when a client tries to unsubscribe - // from not existing subscription. - ErrSubscriptionNotFound = errors.New("subscription not found") - - // ErrAlreadySubscribed is returned when a client tries to subscribe twice or - // more using the same query. - ErrAlreadySubscribed = errors.New("already subscribed") -) - -// TagMap is used to associate tags to a message. -// They can be queried by subscribers to choose messages they will received. -type TagMap interface { - // Get returns the value for a key, or nil if no value is present. - // The ok result indicates whether value was found in the tags. - Get(key string) (value interface{}, ok bool) - // Len returns the number of tags. - Len() int -} - -type tagMap map[string]interface{} - -type cmd struct { - op operation - query Query - ch chan<- interface{} - clientID string - msg interface{} - tags TagMap -} - -// Query defines an interface for a query to be used for subscribing. -type Query interface { - Matches(tags TagMap) bool - String() string -} - -// Server allows clients to subscribe/unsubscribe for messages, publishing -// messages with or without tags, and manages internal state. -type Server struct { - cmn.BaseService - - cmds chan cmd - cmdsCap int - - mtx sync.RWMutex - subscriptions map[string]map[string]Query // subscriber -> query (string) -> Query -} - -// Option sets a parameter for the server. -type Option func(*Server) - -// NewTagMap constructs a new immutable tag set from a map. -func NewTagMap(data map[string]interface{}) TagMap { - return tagMap(data) -} - -// Get returns the value for a key, or nil if no value is present. -// The ok result indicates whether value was found in the tags. -func (ts tagMap) Get(key string) (value interface{}, ok bool) { - value, ok = ts[key] - return -} - -// Len returns the number of tags. -func (ts tagMap) Len() int { - return len(ts) -} - -// NewServer returns a new server. See the commentary on the Option functions -// for a detailed description of how to configure buffering. If no options are -// provided, the resulting server's queue is unbuffered. -func NewServer(options ...Option) *Server { - s := &Server{ - subscriptions: make(map[string]map[string]Query), - } - s.BaseService = *cmn.NewBaseService(nil, "PubSub", s) - - for _, option := range options { - option(s) - } - - // if BufferCapacity option was not set, the channel is unbuffered - s.cmds = make(chan cmd, s.cmdsCap) - - return s -} - -// BufferCapacity allows you to specify capacity for the internal server's -// queue. Since the server, given Y subscribers, could only process X messages, -// this option could be used to survive spikes (e.g. high amount of -// transactions during peak hours). -func BufferCapacity(cap int) Option { - return func(s *Server) { - if cap > 0 { - s.cmdsCap = cap - } - } -} - -// BufferCapacity returns capacity of the internal server's queue. -func (s *Server) BufferCapacity() int { - return s.cmdsCap -} - -// Subscribe creates a subscription for the given client. It accepts a channel -// on which messages matching the given query can be received. An error will be -// returned to the caller if the context is canceled or if subscription already -// exist for pair clientID and query. -func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, out chan<- interface{}) error { - s.mtx.RLock() - clientSubscriptions, ok := s.subscriptions[clientID] - if ok { - _, ok = clientSubscriptions[query.String()] - } - s.mtx.RUnlock() - if ok { - return ErrAlreadySubscribed - } - - select { - case s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}: - s.mtx.Lock() - if _, ok = s.subscriptions[clientID]; !ok { - s.subscriptions[clientID] = make(map[string]Query) - } - s.subscriptions[clientID][query.String()] = query - s.mtx.Unlock() - return nil - case <-ctx.Done(): - return ctx.Err() - } -} - -// Unsubscribe removes the subscription on the given query. An error will be -// returned to the caller if the context is canceled or if subscription does -// not exist. -func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) error { - var origQuery Query - s.mtx.RLock() - clientSubscriptions, ok := s.subscriptions[clientID] - if ok { - origQuery, ok = clientSubscriptions[query.String()] - } - s.mtx.RUnlock() - if !ok { - return ErrSubscriptionNotFound - } - - // original query is used here because we're using pointers as map keys - select { - case s.cmds <- cmd{op: unsub, clientID: clientID, query: origQuery}: - s.mtx.Lock() - delete(clientSubscriptions, query.String()) - s.mtx.Unlock() - return nil - case <-ctx.Done(): - return ctx.Err() - } -} - -// UnsubscribeAll removes all client subscriptions. An error will be returned -// to the caller if the context is canceled or if subscription does not exist. -func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error { - s.mtx.RLock() - _, ok := s.subscriptions[clientID] - s.mtx.RUnlock() - if !ok { - return ErrSubscriptionNotFound - } - - select { - case s.cmds <- cmd{op: unsub, clientID: clientID}: - s.mtx.Lock() - delete(s.subscriptions, clientID) - s.mtx.Unlock() - return nil - case <-ctx.Done(): - return ctx.Err() - } -} - -// Publish publishes the given message. An error will be returned to the caller -// if the context is canceled. -func (s *Server) Publish(ctx context.Context, msg interface{}) error { - return s.PublishWithTags(ctx, msg, NewTagMap(make(map[string]interface{}))) -} - -// PublishWithTags publishes the given message with the set of tags. The set is -// matched with clients queries. If there is a match, the message is sent to -// the client. -func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags TagMap) error { - select { - case s.cmds <- cmd{op: pub, msg: msg, tags: tags}: - return nil - case <-ctx.Done(): - return ctx.Err() - } -} - -// OnStop implements Service.OnStop by shutting down the server. -func (s *Server) OnStop() { - s.cmds <- cmd{op: shutdown} -} - -// NOTE: not goroutine safe -type state struct { - // query -> client -> ch - queries map[Query]map[string]chan<- interface{} - // client -> query -> struct{} - clients map[string]map[Query]struct{} -} - -// OnStart implements Service.OnStart by starting the server. -func (s *Server) OnStart() error { - go s.loop(state{ - queries: make(map[Query]map[string]chan<- interface{}), - clients: make(map[string]map[Query]struct{}), - }) - return nil -} - -// OnReset implements Service.OnReset -func (s *Server) OnReset() error { - return nil -} - -func (s *Server) loop(state state) { -loop: - for cmd := range s.cmds { - switch cmd.op { - case unsub: - if cmd.query != nil { - state.remove(cmd.clientID, cmd.query) - } else { - state.removeAll(cmd.clientID) - } - case shutdown: - for clientID := range state.clients { - state.removeAll(clientID) - } - break loop - case sub: - state.add(cmd.clientID, cmd.query, cmd.ch) - case pub: - state.send(cmd.msg, cmd.tags) - } - } -} - -func (state *state) add(clientID string, q Query, ch chan<- interface{}) { - // add query if needed - if _, ok := state.queries[q]; !ok { - state.queries[q] = make(map[string]chan<- interface{}) - } - - // create subscription - state.queries[q][clientID] = ch - - // add client if needed - if _, ok := state.clients[clientID]; !ok { - state.clients[clientID] = make(map[Query]struct{}) - } - state.clients[clientID][q] = struct{}{} -} - -func (state *state) remove(clientID string, q Query) { - clientToChannelMap, ok := state.queries[q] - if !ok { - return - } - - ch, ok := clientToChannelMap[clientID] - if ok { - close(ch) - - delete(state.clients[clientID], q) - - // if it not subscribed to anything else, remove the client - if len(state.clients[clientID]) == 0 { - delete(state.clients, clientID) - } - - delete(state.queries[q], clientID) - } -} - -func (state *state) removeAll(clientID string) { - queryMap, ok := state.clients[clientID] - if !ok { - return - } - - for q := range queryMap { - ch := state.queries[q][clientID] - close(ch) - - delete(state.queries[q], clientID) - } - - delete(state.clients, clientID) -} - -func (state *state) send(msg interface{}, tags TagMap) { - for q, clientToChannelMap := range state.queries { - if q.Matches(tags) { - for _, ch := range clientToChannelMap { - ch <- msg - } - } - } -} diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go deleted file mode 100644 index f853d163..00000000 --- a/pubsub/pubsub_test.go +++ /dev/null @@ -1,252 +0,0 @@ -package pubsub_test - -import ( - "context" - "fmt" - "runtime/debug" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/tendermint/tmlibs/log" - "github.com/tendermint/tmlibs/pubsub" - "github.com/tendermint/tmlibs/pubsub/query" -) - -const ( - clientID = "test-client" -) - -func TestSubscribe(t *testing.T) { - s := pubsub.NewServer() - s.SetLogger(log.TestingLogger()) - s.Start() - defer s.Stop() - - ctx := context.Background() - ch := make(chan interface{}, 1) - err := s.Subscribe(ctx, clientID, query.Empty{}, ch) - require.NoError(t, err) - err = s.Publish(ctx, "Ka-Zar") - require.NoError(t, err) - assertReceive(t, "Ka-Zar", ch) - - err = s.Publish(ctx, "Quicksilver") - require.NoError(t, err) - assertReceive(t, "Quicksilver", ch) -} - -func TestDifferentClients(t *testing.T) { - s := pubsub.NewServer() - s.SetLogger(log.TestingLogger()) - s.Start() - defer s.Stop() - - ctx := context.Background() - ch1 := make(chan interface{}, 1) - err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"), ch1) - require.NoError(t, err) - err = s.PublishWithTags(ctx, "Iceman", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"})) - require.NoError(t, err) - assertReceive(t, "Iceman", ch1) - - ch2 := make(chan interface{}, 1) - err = s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"), ch2) - require.NoError(t, err) - err = s.PublishWithTags(ctx, "Ultimo", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"})) - require.NoError(t, err) - assertReceive(t, "Ultimo", ch1) - assertReceive(t, "Ultimo", ch2) - - ch3 := make(chan interface{}, 1) - err = s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"), ch3) - require.NoError(t, err) - err = s.PublishWithTags(ctx, "Valeria Richards", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewRoundStep"})) - require.NoError(t, err) - assert.Zero(t, len(ch3)) -} - -func TestClientSubscribesTwice(t *testing.T) { - s := pubsub.NewServer() - s.SetLogger(log.TestingLogger()) - s.Start() - defer s.Stop() - - ctx := context.Background() - q := query.MustParse("tm.events.type='NewBlock'") - - ch1 := make(chan interface{}, 1) - err := s.Subscribe(ctx, clientID, q, ch1) - require.NoError(t, err) - err = s.PublishWithTags(ctx, "Goblin Queen", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"})) - require.NoError(t, err) - assertReceive(t, "Goblin Queen", ch1) - - ch2 := make(chan interface{}, 1) - err = s.Subscribe(ctx, clientID, q, ch2) - require.Error(t, err) - - err = s.PublishWithTags(ctx, "Spider-Man", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"})) - require.NoError(t, err) - assertReceive(t, "Spider-Man", ch1) -} - -func TestUnsubscribe(t *testing.T) { - s := pubsub.NewServer() - s.SetLogger(log.TestingLogger()) - s.Start() - defer s.Stop() - - ctx := context.Background() - ch := make(chan interface{}) - err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), ch) - require.NoError(t, err) - err = s.Unsubscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'")) - require.NoError(t, err) - - err = s.Publish(ctx, "Nick Fury") - require.NoError(t, err) - assert.Zero(t, len(ch), "Should not receive anything after Unsubscribe") - - _, ok := <-ch - assert.False(t, ok) -} - -func TestResubscribe(t *testing.T) { - s := pubsub.NewServer() - s.SetLogger(log.TestingLogger()) - s.Start() - defer s.Stop() - - ctx := context.Background() - ch := make(chan interface{}) - err := s.Subscribe(ctx, clientID, query.Empty{}, ch) - require.NoError(t, err) - err = s.Unsubscribe(ctx, clientID, query.Empty{}) - require.NoError(t, err) - ch = make(chan interface{}) - err = s.Subscribe(ctx, clientID, query.Empty{}, ch) - require.NoError(t, err) - - err = s.Publish(ctx, "Cable") - require.NoError(t, err) - assertReceive(t, "Cable", ch) -} - -func TestUnsubscribeAll(t *testing.T) { - s := pubsub.NewServer() - s.SetLogger(log.TestingLogger()) - s.Start() - defer s.Stop() - - ctx := context.Background() - ch1, ch2 := make(chan interface{}, 1), make(chan interface{}, 1) - err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), ch1) - require.NoError(t, err) - err = s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlockHeader'"), ch2) - require.NoError(t, err) - - err = s.UnsubscribeAll(ctx, clientID) - require.NoError(t, err) - - err = s.Publish(ctx, "Nick Fury") - require.NoError(t, err) - assert.Zero(t, len(ch1), "Should not receive anything after UnsubscribeAll") - assert.Zero(t, len(ch2), "Should not receive anything after UnsubscribeAll") - - _, ok := <-ch1 - assert.False(t, ok) - _, ok = <-ch2 - assert.False(t, ok) -} - -func TestBufferCapacity(t *testing.T) { - s := pubsub.NewServer(pubsub.BufferCapacity(2)) - s.SetLogger(log.TestingLogger()) - - assert.Equal(t, 2, s.BufferCapacity()) - - ctx := context.Background() - err := s.Publish(ctx, "Nighthawk") - require.NoError(t, err) - err = s.Publish(ctx, "Sage") - require.NoError(t, err) - - ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) - defer cancel() - err = s.Publish(ctx, "Ironclad") - if assert.Error(t, err) { - assert.Equal(t, context.DeadlineExceeded, err) - } -} - -func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) } -func Benchmark100Clients(b *testing.B) { benchmarkNClients(100, b) } -func Benchmark1000Clients(b *testing.B) { benchmarkNClients(1000, b) } - -func Benchmark10ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(10, b) } -func Benchmark100ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(100, b) } -func Benchmark1000ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(1000, b) } - -func benchmarkNClients(n int, b *testing.B) { - s := pubsub.NewServer() - s.Start() - defer s.Stop() - - ctx := context.Background() - for i := 0; i < n; i++ { - ch := make(chan interface{}) - go func() { - for range ch { - } - }() - s.Subscribe(ctx, clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = %d", i)), ch) - } - - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": i})) - } -} - -func benchmarkNClientsOneQuery(n int, b *testing.B) { - s := pubsub.NewServer() - s.Start() - defer s.Stop() - - ctx := context.Background() - q := query.MustParse("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = 1") - for i := 0; i < n; i++ { - ch := make(chan interface{}) - go func() { - for range ch { - } - }() - s.Subscribe(ctx, clientID, q, ch) - } - - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": 1})) - } -} - -/////////////////////////////////////////////////////////////////////////////// -/// HELPERS -/////////////////////////////////////////////////////////////////////////////// - -func assertReceive(t *testing.T, expected interface{}, ch <-chan interface{}, msgAndArgs ...interface{}) { - select { - case actual := <-ch: - if actual != nil { - assert.Equal(t, expected, actual, msgAndArgs...) - } - case <-time.After(1 * time.Second): - t.Errorf("Expected to receive %v from the channel, got nothing after 1s", expected) - debug.PrintStack() - } -} diff --git a/pubsub/query/Makefile b/pubsub/query/Makefile deleted file mode 100644 index ca3ff5b5..00000000 --- a/pubsub/query/Makefile +++ /dev/null @@ -1,11 +0,0 @@ -gen_query_parser: - @go get github.com/pointlander/peg - peg -inline -switch query.peg - -fuzzy_test: - @go get github.com/dvyukov/go-fuzz/go-fuzz - @go get github.com/dvyukov/go-fuzz/go-fuzz-build - go-fuzz-build github.com/tendermint/tmlibs/pubsub/query/fuzz_test - go-fuzz -bin=./fuzz_test-fuzz.zip -workdir=./fuzz_test/output - -.PHONY: gen_query_parser fuzzy_test diff --git a/pubsub/query/empty.go b/pubsub/query/empty.go deleted file mode 100644 index cefdace4..00000000 --- a/pubsub/query/empty.go +++ /dev/null @@ -1,16 +0,0 @@ -package query - -import "github.com/tendermint/tmlibs/pubsub" - -// Empty query matches any set of tags. -type Empty struct { -} - -// Matches always returns true. -func (Empty) Matches(tags pubsub.TagMap) bool { - return true -} - -func (Empty) String() string { - return "empty" -} diff --git a/pubsub/query/empty_test.go b/pubsub/query/empty_test.go deleted file mode 100644 index b5e8a300..00000000 --- a/pubsub/query/empty_test.go +++ /dev/null @@ -1,17 +0,0 @@ -package query_test - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/tendermint/tmlibs/pubsub" - "github.com/tendermint/tmlibs/pubsub/query" -) - -func TestEmptyQueryMatchesAnything(t *testing.T) { - q := query.Empty{} - assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{}))) - assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Asher": "Roth"}))) - assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Route": 66}))) - assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Route": 66, "Billy": "Blue"}))) -} diff --git a/pubsub/query/fuzz_test/main.go b/pubsub/query/fuzz_test/main.go deleted file mode 100644 index 3b0ef147..00000000 --- a/pubsub/query/fuzz_test/main.go +++ /dev/null @@ -1,30 +0,0 @@ -package fuzz_test - -import ( - "fmt" - - "github.com/tendermint/tmlibs/pubsub/query" -) - -func Fuzz(data []byte) int { - sdata := string(data) - q0, err := query.New(sdata) - if err != nil { - return 0 - } - - sdata1 := q0.String() - q1, err := query.New(sdata1) - if err != nil { - panic(err) - } - - sdata2 := q1.String() - if sdata1 != sdata2 { - fmt.Printf("q0: %q\n", sdata1) - fmt.Printf("q1: %q\n", sdata2) - panic("query changed") - } - - return 1 -} diff --git a/pubsub/query/parser_test.go b/pubsub/query/parser_test.go deleted file mode 100644 index e31079b4..00000000 --- a/pubsub/query/parser_test.go +++ /dev/null @@ -1,91 +0,0 @@ -package query_test - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/tendermint/tmlibs/pubsub/query" -) - -// TODO: fuzzy testing? -func TestParser(t *testing.T) { - cases := []struct { - query string - valid bool - }{ - {"tm.events.type='NewBlock'", true}, - {"tm.events.type = 'NewBlock'", true}, - {"tm.events.name = ''", true}, - {"tm.events.type='TIME'", true}, - {"tm.events.type='DATE'", true}, - {"tm.events.type='='", true}, - {"tm.events.type='TIME", false}, - {"tm.events.type=TIME'", false}, - {"tm.events.type==", false}, - {"tm.events.type=NewBlock", false}, - {">==", false}, - {"tm.events.type 'NewBlock' =", false}, - {"tm.events.type>'NewBlock'", false}, - {"", false}, - {"=", false}, - {"='NewBlock'", false}, - {"tm.events.type=", false}, - - {"tm.events.typeNewBlock", false}, - {"tm.events.type'NewBlock'", false}, - {"'NewBlock'", false}, - {"NewBlock", false}, - {"", false}, - - {"tm.events.type='NewBlock' AND abci.account.name='Igor'", true}, - {"tm.events.type='NewBlock' AND", false}, - {"tm.events.type='NewBlock' AN", false}, - {"tm.events.type='NewBlock' AN tm.events.type='NewBlockHeader'", false}, - {"AND tm.events.type='NewBlock' ", false}, - - {"abci.account.name CONTAINS 'Igor'", true}, - - {"tx.date > DATE 2013-05-03", true}, - {"tx.date < DATE 2013-05-03", true}, - {"tx.date <= DATE 2013-05-03", true}, - {"tx.date >= DATE 2013-05-03", true}, - {"tx.date >= DAT 2013-05-03", false}, - {"tx.date <= DATE2013-05-03", false}, - {"tx.date <= DATE -05-03", false}, - {"tx.date >= DATE 20130503", false}, - {"tx.date >= DATE 2013+01-03", false}, - // incorrect year, month, day - {"tx.date >= DATE 0013-01-03", false}, - {"tx.date >= DATE 2013-31-03", false}, - {"tx.date >= DATE 2013-01-83", false}, - - {"tx.date > TIME 2013-05-03T14:45:00+07:00", true}, - {"tx.date < TIME 2013-05-03T14:45:00-02:00", true}, - {"tx.date <= TIME 2013-05-03T14:45:00Z", true}, - {"tx.date >= TIME 2013-05-03T14:45:00Z", true}, - {"tx.date >= TIME2013-05-03T14:45:00Z", false}, - {"tx.date = IME 2013-05-03T14:45:00Z", false}, - {"tx.date = TIME 2013-05-:45:00Z", false}, - {"tx.date >= TIME 2013-05-03T14:45:00", false}, - {"tx.date >= TIME 0013-00-00T14:45:00Z", false}, - {"tx.date >= TIME 2013+05=03T14:45:00Z", false}, - - {"account.balance=100", true}, - {"account.balance >= 200", true}, - {"account.balance >= -300", false}, - {"account.balance >>= 400", false}, - {"account.balance=33.22.1", false}, - - {"hash='136E18F7E4C348B780CF873A0BF43922E5BAFA63'", true}, - {"hash=136E18F7E4C348B780CF873A0BF43922E5BAFA63", false}, - } - - for _, c := range cases { - _, err := query.New(c.query) - if c.valid { - assert.NoErrorf(t, err, "Query was '%s'", c.query) - } else { - assert.Errorf(t, err, "Query was '%s'", c.query) - } - } -} diff --git a/pubsub/query/query.go b/pubsub/query/query.go deleted file mode 100644 index 84c3aa18..00000000 --- a/pubsub/query/query.go +++ /dev/null @@ -1,345 +0,0 @@ -// Package query provides a parser for a custom query format: -// -// abci.invoice.number=22 AND abci.invoice.owner=Ivan -// -// See query.peg for the grammar, which is a https://en.wikipedia.org/wiki/Parsing_expression_grammar. -// More: https://github.com/PhilippeSigaud/Pegged/wiki/PEG-Basics -// -// It has a support for numbers (integer and floating point), dates and times. -package query - -import ( - "fmt" - "reflect" - "strconv" - "strings" - "time" - - "github.com/tendermint/tmlibs/pubsub" -) - -// Query holds the query string and the query parser. -type Query struct { - str string - parser *QueryParser -} - -// Condition represents a single condition within a query and consists of tag -// (e.g. "tx.gas"), operator (e.g. "=") and operand (e.g. "7"). -type Condition struct { - Tag string - Op Operator - Operand interface{} -} - -// New parses the given string and returns a query or error if the string is -// invalid. -func New(s string) (*Query, error) { - p := &QueryParser{Buffer: fmt.Sprintf(`"%s"`, s)} - p.Init() - if err := p.Parse(); err != nil { - return nil, err - } - return &Query{str: s, parser: p}, nil -} - -// MustParse turns the given string into a query or panics; for tests or others -// cases where you know the string is valid. -func MustParse(s string) *Query { - q, err := New(s) - if err != nil { - panic(fmt.Sprintf("failed to parse %s: %v", s, err)) - } - return q -} - -// String returns the original string. -func (q *Query) String() string { - return q.str -} - -// Operator is an operator that defines some kind of relation between tag and -// operand (equality, etc.). -type Operator uint8 - -const ( - // "<=" - OpLessEqual Operator = iota - // ">=" - OpGreaterEqual - // "<" - OpLess - // ">" - OpGreater - // "=" - OpEqual - // "CONTAINS"; used to check if a string contains a certain sub string. - OpContains -) - -// Conditions returns a list of conditions. -func (q *Query) Conditions() []Condition { - conditions := make([]Condition, 0) - - buffer, begin, end := q.parser.Buffer, 0, 0 - - var tag string - var op Operator - - // tokens must be in the following order: tag ("tx.gas") -> operator ("=") -> operand ("7") - for _, token := range q.parser.Tokens() { - switch token.pegRule { - - case rulePegText: - begin, end = int(token.begin), int(token.end) - case ruletag: - tag = buffer[begin:end] - case rulele: - op = OpLessEqual - case rulege: - op = OpGreaterEqual - case rulel: - op = OpLess - case ruleg: - op = OpGreater - case ruleequal: - op = OpEqual - case rulecontains: - op = OpContains - case rulevalue: - // strip single quotes from value (i.e. "'NewBlock'" -> "NewBlock") - valueWithoutSingleQuotes := buffer[begin+1 : end-1] - conditions = append(conditions, Condition{tag, op, valueWithoutSingleQuotes}) - case rulenumber: - number := buffer[begin:end] - if strings.Contains(number, ".") { // if it looks like a floating-point number - value, err := strconv.ParseFloat(number, 64) - if err != nil { - panic(fmt.Sprintf("got %v while trying to parse %s as float64 (should never happen if the grammar is correct)", err, number)) - } - conditions = append(conditions, Condition{tag, op, value}) - } else { - value, err := strconv.ParseInt(number, 10, 64) - if err != nil { - panic(fmt.Sprintf("got %v while trying to parse %s as int64 (should never happen if the grammar is correct)", err, number)) - } - conditions = append(conditions, Condition{tag, op, value}) - } - case ruletime: - value, err := time.Parse(time.RFC3339, buffer[begin:end]) - if err != nil { - panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)", err, buffer[begin:end])) - } - conditions = append(conditions, Condition{tag, op, value}) - case ruledate: - value, err := time.Parse("2006-01-02", buffer[begin:end]) - if err != nil { - panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / '2006-01-02' (should never happen if the grammar is correct)", err, buffer[begin:end])) - } - conditions = append(conditions, Condition{tag, op, value}) - } - } - - return conditions -} - -// Matches returns true if the query matches the given set of tags, false otherwise. -// -// For example, query "name=John" matches tags = {"name": "John"}. More -// examples could be found in parser_test.go and query_test.go. -func (q *Query) Matches(tags pubsub.TagMap) bool { - if tags.Len() == 0 { - return false - } - - buffer, begin, end := q.parser.Buffer, 0, 0 - - var tag string - var op Operator - - // tokens must be in the following order: tag ("tx.gas") -> operator ("=") -> operand ("7") - for _, token := range q.parser.Tokens() { - switch token.pegRule { - - case rulePegText: - begin, end = int(token.begin), int(token.end) - case ruletag: - tag = buffer[begin:end] - case rulele: - op = OpLessEqual - case rulege: - op = OpGreaterEqual - case rulel: - op = OpLess - case ruleg: - op = OpGreater - case ruleequal: - op = OpEqual - case rulecontains: - op = OpContains - case rulevalue: - // strip single quotes from value (i.e. "'NewBlock'" -> "NewBlock") - valueWithoutSingleQuotes := buffer[begin+1 : end-1] - - // see if the triplet (tag, operator, operand) matches any tag - // "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" } - if !match(tag, op, reflect.ValueOf(valueWithoutSingleQuotes), tags) { - return false - } - case rulenumber: - number := buffer[begin:end] - if strings.Contains(number, ".") { // if it looks like a floating-point number - value, err := strconv.ParseFloat(number, 64) - if err != nil { - panic(fmt.Sprintf("got %v while trying to parse %s as float64 (should never happen if the grammar is correct)", err, number)) - } - if !match(tag, op, reflect.ValueOf(value), tags) { - return false - } - } else { - value, err := strconv.ParseInt(number, 10, 64) - if err != nil { - panic(fmt.Sprintf("got %v while trying to parse %s as int64 (should never happen if the grammar is correct)", err, number)) - } - if !match(tag, op, reflect.ValueOf(value), tags) { - return false - } - } - case ruletime: - value, err := time.Parse(time.RFC3339, buffer[begin:end]) - if err != nil { - panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)", err, buffer[begin:end])) - } - if !match(tag, op, reflect.ValueOf(value), tags) { - return false - } - case ruledate: - value, err := time.Parse("2006-01-02", buffer[begin:end]) - if err != nil { - panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / '2006-01-02' (should never happen if the grammar is correct)", err, buffer[begin:end])) - } - if !match(tag, op, reflect.ValueOf(value), tags) { - return false - } - } - } - - return true -} - -// match returns true if the given triplet (tag, operator, operand) matches any tag. -// -// First, it looks up the tag in tags and if it finds one, tries to compare the -// value from it to the operand using the operator. -// -// "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" } -func match(tag string, op Operator, operand reflect.Value, tags pubsub.TagMap) bool { - // look up the tag from the query in tags - value, ok := tags.Get(tag) - if !ok { - return false - } - switch operand.Kind() { - case reflect.Struct: // time - operandAsTime := operand.Interface().(time.Time) - v, ok := value.(time.Time) - if !ok { // if value from tags is not time.Time - return false - } - switch op { - case OpLessEqual: - return v.Before(operandAsTime) || v.Equal(operandAsTime) - case OpGreaterEqual: - return v.Equal(operandAsTime) || v.After(operandAsTime) - case OpLess: - return v.Before(operandAsTime) - case OpGreater: - return v.After(operandAsTime) - case OpEqual: - return v.Equal(operandAsTime) - } - case reflect.Float64: - operandFloat64 := operand.Interface().(float64) - var v float64 - // try our best to convert value from tags to float64 - switch vt := value.(type) { - case float64: - v = vt - case float32: - v = float64(vt) - case int: - v = float64(vt) - case int8: - v = float64(vt) - case int16: - v = float64(vt) - case int32: - v = float64(vt) - case int64: - v = float64(vt) - default: // fail for all other types - panic(fmt.Sprintf("Incomparable types: %T (%v) vs float64 (%v)", value, value, operandFloat64)) - } - switch op { - case OpLessEqual: - return v <= operandFloat64 - case OpGreaterEqual: - return v >= operandFloat64 - case OpLess: - return v < operandFloat64 - case OpGreater: - return v > operandFloat64 - case OpEqual: - return v == operandFloat64 - } - case reflect.Int64: - operandInt := operand.Interface().(int64) - var v int64 - // try our best to convert value from tags to int64 - switch vt := value.(type) { - case int64: - v = vt - case int8: - v = int64(vt) - case int16: - v = int64(vt) - case int32: - v = int64(vt) - case int: - v = int64(vt) - case float64: - v = int64(vt) - case float32: - v = int64(vt) - default: // fail for all other types - panic(fmt.Sprintf("Incomparable types: %T (%v) vs int64 (%v)", value, value, operandInt)) - } - switch op { - case OpLessEqual: - return v <= operandInt - case OpGreaterEqual: - return v >= operandInt - case OpLess: - return v < operandInt - case OpGreater: - return v > operandInt - case OpEqual: - return v == operandInt - } - case reflect.String: - v, ok := value.(string) - if !ok { // if value from tags is not string - return false - } - switch op { - case OpEqual: - return v == operand.String() - case OpContains: - return strings.Contains(v, operand.String()) - } - default: - panic(fmt.Sprintf("Unknown kind of operand %v", operand.Kind())) - } - - return false -} diff --git a/pubsub/query/query.peg b/pubsub/query/query.peg deleted file mode 100644 index 739892e4..00000000 --- a/pubsub/query/query.peg +++ /dev/null @@ -1,33 +0,0 @@ -package query - -type QueryParser Peg { -} - -e <- '\"' condition ( ' '+ and ' '+ condition )* '\"' !. - -condition <- tag ' '* (le ' '* (number / time / date) - / ge ' '* (number / time / date) - / l ' '* (number / time / date) - / g ' '* (number / time / date) - / equal ' '* (number / time / date / value) - / contains ' '* value - ) - -tag <- < (![ \t\n\r\\()"'=><] .)+ > -value <- < '\'' (!["'] .)* '\''> -number <- < ('0' - / [1-9] digit* ('.' digit*)?) > -digit <- [0-9] -time <- "TIME " < year '-' month '-' day 'T' digit digit ':' digit digit ':' digit digit (('-' / '+') digit digit ':' digit digit / 'Z') > -date <- "DATE " < year '-' month '-' day > -year <- ('1' / '2') digit digit digit -month <- ('0' / '1') digit -day <- ('0' / '1' / '2' / '3') digit -and <- "AND" - -equal <- "=" -contains <- "CONTAINS" -le <- "<=" -ge <- ">=" -l <- "<" -g <- ">" diff --git a/pubsub/query/query.peg.go b/pubsub/query/query.peg.go deleted file mode 100644 index c86e4a47..00000000 --- a/pubsub/query/query.peg.go +++ /dev/null @@ -1,1553 +0,0 @@ -// nolint -package query - -import ( - "fmt" - "math" - "sort" - "strconv" -) - -const endSymbol rune = 1114112 - -/* The rule types inferred from the grammar are below. */ -type pegRule uint8 - -const ( - ruleUnknown pegRule = iota - rulee - rulecondition - ruletag - rulevalue - rulenumber - ruledigit - ruletime - ruledate - ruleyear - rulemonth - ruleday - ruleand - ruleequal - rulecontains - rulele - rulege - rulel - ruleg - rulePegText -) - -var rul3s = [...]string{ - "Unknown", - "e", - "condition", - "tag", - "value", - "number", - "digit", - "time", - "date", - "year", - "month", - "day", - "and", - "equal", - "contains", - "le", - "ge", - "l", - "g", - "PegText", -} - -type token32 struct { - pegRule - begin, end uint32 -} - -func (t *token32) String() string { - return fmt.Sprintf("\x1B[34m%v\x1B[m %v %v", rul3s[t.pegRule], t.begin, t.end) -} - -type node32 struct { - token32 - up, next *node32 -} - -func (node *node32) print(pretty bool, buffer string) { - var print func(node *node32, depth int) - print = func(node *node32, depth int) { - for node != nil { - for c := 0; c < depth; c++ { - fmt.Printf(" ") - } - rule := rul3s[node.pegRule] - quote := strconv.Quote(string(([]rune(buffer)[node.begin:node.end]))) - if !pretty { - fmt.Printf("%v %v\n", rule, quote) - } else { - fmt.Printf("\x1B[34m%v\x1B[m %v\n", rule, quote) - } - if node.up != nil { - print(node.up, depth+1) - } - node = node.next - } - } - print(node, 0) -} - -func (node *node32) Print(buffer string) { - node.print(false, buffer) -} - -func (node *node32) PrettyPrint(buffer string) { - node.print(true, buffer) -} - -type tokens32 struct { - tree []token32 -} - -func (t *tokens32) Trim(length uint32) { - t.tree = t.tree[:length] -} - -func (t *tokens32) Print() { - for _, token := range t.tree { - fmt.Println(token.String()) - } -} - -func (t *tokens32) AST() *node32 { - type element struct { - node *node32 - down *element - } - tokens := t.Tokens() - var stack *element - for _, token := range tokens { - if token.begin == token.end { - continue - } - node := &node32{token32: token} - for stack != nil && stack.node.begin >= token.begin && stack.node.end <= token.end { - stack.node.next = node.up - node.up = stack.node - stack = stack.down - } - stack = &element{node: node, down: stack} - } - if stack != nil { - return stack.node - } - return nil -} - -func (t *tokens32) PrintSyntaxTree(buffer string) { - t.AST().Print(buffer) -} - -func (t *tokens32) PrettyPrintSyntaxTree(buffer string) { - t.AST().PrettyPrint(buffer) -} - -func (t *tokens32) Add(rule pegRule, begin, end, index uint32) { - if tree := t.tree; int(index) >= len(tree) { - expanded := make([]token32, 2*len(tree)) - copy(expanded, tree) - t.tree = expanded - } - t.tree[index] = token32{ - pegRule: rule, - begin: begin, - end: end, - } -} - -func (t *tokens32) Tokens() []token32 { - return t.tree -} - -type QueryParser struct { - Buffer string - buffer []rune - rules [20]func() bool - parse func(rule ...int) error - reset func() - Pretty bool - tokens32 -} - -func (p *QueryParser) Parse(rule ...int) error { - return p.parse(rule...) -} - -func (p *QueryParser) Reset() { - p.reset() -} - -type textPosition struct { - line, symbol int -} - -type textPositionMap map[int]textPosition - -func translatePositions(buffer []rune, positions []int) textPositionMap { - length, translations, j, line, symbol := len(positions), make(textPositionMap, len(positions)), 0, 1, 0 - sort.Ints(positions) - -search: - for i, c := range buffer { - if c == '\n' { - line, symbol = line+1, 0 - } else { - symbol++ - } - if i == positions[j] { - translations[positions[j]] = textPosition{line, symbol} - for j++; j < length; j++ { - if i != positions[j] { - continue search - } - } - break search - } - } - - return translations -} - -type parseError struct { - p *QueryParser - max token32 -} - -func (e *parseError) Error() string { - tokens, error := []token32{e.max}, "\n" - positions, p := make([]int, 2*len(tokens)), 0 - for _, token := range tokens { - positions[p], p = int(token.begin), p+1 - positions[p], p = int(token.end), p+1 - } - translations := translatePositions(e.p.buffer, positions) - format := "parse error near %v (line %v symbol %v - line %v symbol %v):\n%v\n" - if e.p.Pretty { - format = "parse error near \x1B[34m%v\x1B[m (line %v symbol %v - line %v symbol %v):\n%v\n" - } - for _, token := range tokens { - begin, end := int(token.begin), int(token.end) - error += fmt.Sprintf(format, - rul3s[token.pegRule], - translations[begin].line, translations[begin].symbol, - translations[end].line, translations[end].symbol, - strconv.Quote(string(e.p.buffer[begin:end]))) - } - - return error -} - -func (p *QueryParser) PrintSyntaxTree() { - if p.Pretty { - p.tokens32.PrettyPrintSyntaxTree(p.Buffer) - } else { - p.tokens32.PrintSyntaxTree(p.Buffer) - } -} - -func (p *QueryParser) Init() { - var ( - max token32 - position, tokenIndex uint32 - buffer []rune - ) - p.reset = func() { - max = token32{} - position, tokenIndex = 0, 0 - - p.buffer = []rune(p.Buffer) - if len(p.buffer) == 0 || p.buffer[len(p.buffer)-1] != endSymbol { - p.buffer = append(p.buffer, endSymbol) - } - buffer = p.buffer - } - p.reset() - - _rules := p.rules - tree := tokens32{tree: make([]token32, math.MaxInt16)} - p.parse = func(rule ...int) error { - r := 1 - if len(rule) > 0 { - r = rule[0] - } - matches := p.rules[r]() - p.tokens32 = tree - if matches { - p.Trim(tokenIndex) - return nil - } - return &parseError{p, max} - } - - add := func(rule pegRule, begin uint32) { - tree.Add(rule, begin, position, tokenIndex) - tokenIndex++ - if begin != position && position > max.end { - max = token32{rule, begin, position} - } - } - - matchDot := func() bool { - if buffer[position] != endSymbol { - position++ - return true - } - return false - } - - /*matchChar := func(c byte) bool { - if buffer[position] == c { - position++ - return true - } - return false - }*/ - - /*matchRange := func(lower byte, upper byte) bool { - if c := buffer[position]; c >= lower && c <= upper { - position++ - return true - } - return false - }*/ - - _rules = [...]func() bool{ - nil, - /* 0 e <- <('"' condition (' '+ and ' '+ condition)* '"' !.)> */ - func() bool { - position0, tokenIndex0 := position, tokenIndex - { - position1 := position - if buffer[position] != rune('"') { - goto l0 - } - position++ - if !_rules[rulecondition]() { - goto l0 - } - l2: - { - position3, tokenIndex3 := position, tokenIndex - if buffer[position] != rune(' ') { - goto l3 - } - position++ - l4: - { - position5, tokenIndex5 := position, tokenIndex - if buffer[position] != rune(' ') { - goto l5 - } - position++ - goto l4 - l5: - position, tokenIndex = position5, tokenIndex5 - } - { - position6 := position - { - position7, tokenIndex7 := position, tokenIndex - if buffer[position] != rune('a') { - goto l8 - } - position++ - goto l7 - l8: - position, tokenIndex = position7, tokenIndex7 - if buffer[position] != rune('A') { - goto l3 - } - position++ - } - l7: - { - position9, tokenIndex9 := position, tokenIndex - if buffer[position] != rune('n') { - goto l10 - } - position++ - goto l9 - l10: - position, tokenIndex = position9, tokenIndex9 - if buffer[position] != rune('N') { - goto l3 - } - position++ - } - l9: - { - position11, tokenIndex11 := position, tokenIndex - if buffer[position] != rune('d') { - goto l12 - } - position++ - goto l11 - l12: - position, tokenIndex = position11, tokenIndex11 - if buffer[position] != rune('D') { - goto l3 - } - position++ - } - l11: - add(ruleand, position6) - } - if buffer[position] != rune(' ') { - goto l3 - } - position++ - l13: - { - position14, tokenIndex14 := position, tokenIndex - if buffer[position] != rune(' ') { - goto l14 - } - position++ - goto l13 - l14: - position, tokenIndex = position14, tokenIndex14 - } - if !_rules[rulecondition]() { - goto l3 - } - goto l2 - l3: - position, tokenIndex = position3, tokenIndex3 - } - if buffer[position] != rune('"') { - goto l0 - } - position++ - { - position15, tokenIndex15 := position, tokenIndex - if !matchDot() { - goto l15 - } - goto l0 - l15: - position, tokenIndex = position15, tokenIndex15 - } - add(rulee, position1) - } - return true - l0: - position, tokenIndex = position0, tokenIndex0 - return false - }, - /* 1 condition <- <(tag ' '* ((le ' '* ((&('D' | 'd') date) | (&('T' | 't') time) | (&('0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9') number))) / (ge ' '* ((&('D' | 'd') date) | (&('T' | 't') time) | (&('0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9') number))) / ((&('=') (equal ' '* ((&('\'') value) | (&('D' | 'd') date) | (&('T' | 't') time) | (&('0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9') number)))) | (&('>') (g ' '* ((&('D' | 'd') date) | (&('T' | 't') time) | (&('0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9') number)))) | (&('<') (l ' '* ((&('D' | 'd') date) | (&('T' | 't') time) | (&('0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9') number)))) | (&('C' | 'c') (contains ' '* value)))))> */ - func() bool { - position16, tokenIndex16 := position, tokenIndex - { - position17 := position - { - position18 := position - { - position19 := position - { - position22, tokenIndex22 := position, tokenIndex - { - switch buffer[position] { - case '<': - if buffer[position] != rune('<') { - goto l22 - } - position++ - break - case '>': - if buffer[position] != rune('>') { - goto l22 - } - position++ - break - case '=': - if buffer[position] != rune('=') { - goto l22 - } - position++ - break - case '\'': - if buffer[position] != rune('\'') { - goto l22 - } - position++ - break - case '"': - if buffer[position] != rune('"') { - goto l22 - } - position++ - break - case ')': - if buffer[position] != rune(')') { - goto l22 - } - position++ - break - case '(': - if buffer[position] != rune('(') { - goto l22 - } - position++ - break - case '\\': - if buffer[position] != rune('\\') { - goto l22 - } - position++ - break - case '\r': - if buffer[position] != rune('\r') { - goto l22 - } - position++ - break - case '\n': - if buffer[position] != rune('\n') { - goto l22 - } - position++ - break - case '\t': - if buffer[position] != rune('\t') { - goto l22 - } - position++ - break - default: - if buffer[position] != rune(' ') { - goto l22 - } - position++ - break - } - } - - goto l16 - l22: - position, tokenIndex = position22, tokenIndex22 - } - if !matchDot() { - goto l16 - } - l20: - { - position21, tokenIndex21 := position, tokenIndex - { - position24, tokenIndex24 := position, tokenIndex - { - switch buffer[position] { - case '<': - if buffer[position] != rune('<') { - goto l24 - } - position++ - break - case '>': - if buffer[position] != rune('>') { - goto l24 - } - position++ - break - case '=': - if buffer[position] != rune('=') { - goto l24 - } - position++ - break - case '\'': - if buffer[position] != rune('\'') { - goto l24 - } - position++ - break - case '"': - if buffer[position] != rune('"') { - goto l24 - } - position++ - break - case ')': - if buffer[position] != rune(')') { - goto l24 - } - position++ - break - case '(': - if buffer[position] != rune('(') { - goto l24 - } - position++ - break - case '\\': - if buffer[position] != rune('\\') { - goto l24 - } - position++ - break - case '\r': - if buffer[position] != rune('\r') { - goto l24 - } - position++ - break - case '\n': - if buffer[position] != rune('\n') { - goto l24 - } - position++ - break - case '\t': - if buffer[position] != rune('\t') { - goto l24 - } - position++ - break - default: - if buffer[position] != rune(' ') { - goto l24 - } - position++ - break - } - } - - goto l21 - l24: - position, tokenIndex = position24, tokenIndex24 - } - if !matchDot() { - goto l21 - } - goto l20 - l21: - position, tokenIndex = position21, tokenIndex21 - } - add(rulePegText, position19) - } - add(ruletag, position18) - } - l26: - { - position27, tokenIndex27 := position, tokenIndex - if buffer[position] != rune(' ') { - goto l27 - } - position++ - goto l26 - l27: - position, tokenIndex = position27, tokenIndex27 - } - { - position28, tokenIndex28 := position, tokenIndex - { - position30 := position - if buffer[position] != rune('<') { - goto l29 - } - position++ - if buffer[position] != rune('=') { - goto l29 - } - position++ - add(rulele, position30) - } - l31: - { - position32, tokenIndex32 := position, tokenIndex - if buffer[position] != rune(' ') { - goto l32 - } - position++ - goto l31 - l32: - position, tokenIndex = position32, tokenIndex32 - } - { - switch buffer[position] { - case 'D', 'd': - if !_rules[ruledate]() { - goto l29 - } - break - case 'T', 't': - if !_rules[ruletime]() { - goto l29 - } - break - default: - if !_rules[rulenumber]() { - goto l29 - } - break - } - } - - goto l28 - l29: - position, tokenIndex = position28, tokenIndex28 - { - position35 := position - if buffer[position] != rune('>') { - goto l34 - } - position++ - if buffer[position] != rune('=') { - goto l34 - } - position++ - add(rulege, position35) - } - l36: - { - position37, tokenIndex37 := position, tokenIndex - if buffer[position] != rune(' ') { - goto l37 - } - position++ - goto l36 - l37: - position, tokenIndex = position37, tokenIndex37 - } - { - switch buffer[position] { - case 'D', 'd': - if !_rules[ruledate]() { - goto l34 - } - break - case 'T', 't': - if !_rules[ruletime]() { - goto l34 - } - break - default: - if !_rules[rulenumber]() { - goto l34 - } - break - } - } - - goto l28 - l34: - position, tokenIndex = position28, tokenIndex28 - { - switch buffer[position] { - case '=': - { - position40 := position - if buffer[position] != rune('=') { - goto l16 - } - position++ - add(ruleequal, position40) - } - l41: - { - position42, tokenIndex42 := position, tokenIndex - if buffer[position] != rune(' ') { - goto l42 - } - position++ - goto l41 - l42: - position, tokenIndex = position42, tokenIndex42 - } - { - switch buffer[position] { - case '\'': - if !_rules[rulevalue]() { - goto l16 - } - break - case 'D', 'd': - if !_rules[ruledate]() { - goto l16 - } - break - case 'T', 't': - if !_rules[ruletime]() { - goto l16 - } - break - default: - if !_rules[rulenumber]() { - goto l16 - } - break - } - } - - break - case '>': - { - position44 := position - if buffer[position] != rune('>') { - goto l16 - } - position++ - add(ruleg, position44) - } - l45: - { - position46, tokenIndex46 := position, tokenIndex - if buffer[position] != rune(' ') { - goto l46 - } - position++ - goto l45 - l46: - position, tokenIndex = position46, tokenIndex46 - } - { - switch buffer[position] { - case 'D', 'd': - if !_rules[ruledate]() { - goto l16 - } - break - case 'T', 't': - if !_rules[ruletime]() { - goto l16 - } - break - default: - if !_rules[rulenumber]() { - goto l16 - } - break - } - } - - break - case '<': - { - position48 := position - if buffer[position] != rune('<') { - goto l16 - } - position++ - add(rulel, position48) - } - l49: - { - position50, tokenIndex50 := position, tokenIndex - if buffer[position] != rune(' ') { - goto l50 - } - position++ - goto l49 - l50: - position, tokenIndex = position50, tokenIndex50 - } - { - switch buffer[position] { - case 'D', 'd': - if !_rules[ruledate]() { - goto l16 - } - break - case 'T', 't': - if !_rules[ruletime]() { - goto l16 - } - break - default: - if !_rules[rulenumber]() { - goto l16 - } - break - } - } - - break - default: - { - position52 := position - { - position53, tokenIndex53 := position, tokenIndex - if buffer[position] != rune('c') { - goto l54 - } - position++ - goto l53 - l54: - position, tokenIndex = position53, tokenIndex53 - if buffer[position] != rune('C') { - goto l16 - } - position++ - } - l53: - { - position55, tokenIndex55 := position, tokenIndex - if buffer[position] != rune('o') { - goto l56 - } - position++ - goto l55 - l56: - position, tokenIndex = position55, tokenIndex55 - if buffer[position] != rune('O') { - goto l16 - } - position++ - } - l55: - { - position57, tokenIndex57 := position, tokenIndex - if buffer[position] != rune('n') { - goto l58 - } - position++ - goto l57 - l58: - position, tokenIndex = position57, tokenIndex57 - if buffer[position] != rune('N') { - goto l16 - } - position++ - } - l57: - { - position59, tokenIndex59 := position, tokenIndex - if buffer[position] != rune('t') { - goto l60 - } - position++ - goto l59 - l60: - position, tokenIndex = position59, tokenIndex59 - if buffer[position] != rune('T') { - goto l16 - } - position++ - } - l59: - { - position61, tokenIndex61 := position, tokenIndex - if buffer[position] != rune('a') { - goto l62 - } - position++ - goto l61 - l62: - position, tokenIndex = position61, tokenIndex61 - if buffer[position] != rune('A') { - goto l16 - } - position++ - } - l61: - { - position63, tokenIndex63 := position, tokenIndex - if buffer[position] != rune('i') { - goto l64 - } - position++ - goto l63 - l64: - position, tokenIndex = position63, tokenIndex63 - if buffer[position] != rune('I') { - goto l16 - } - position++ - } - l63: - { - position65, tokenIndex65 := position, tokenIndex - if buffer[position] != rune('n') { - goto l66 - } - position++ - goto l65 - l66: - position, tokenIndex = position65, tokenIndex65 - if buffer[position] != rune('N') { - goto l16 - } - position++ - } - l65: - { - position67, tokenIndex67 := position, tokenIndex - if buffer[position] != rune('s') { - goto l68 - } - position++ - goto l67 - l68: - position, tokenIndex = position67, tokenIndex67 - if buffer[position] != rune('S') { - goto l16 - } - position++ - } - l67: - add(rulecontains, position52) - } - l69: - { - position70, tokenIndex70 := position, tokenIndex - if buffer[position] != rune(' ') { - goto l70 - } - position++ - goto l69 - l70: - position, tokenIndex = position70, tokenIndex70 - } - if !_rules[rulevalue]() { - goto l16 - } - break - } - } - - } - l28: - add(rulecondition, position17) - } - return true - l16: - position, tokenIndex = position16, tokenIndex16 - return false - }, - /* 2 tag <- <<(!((&('<') '<') | (&('>') '>') | (&('=') '=') | (&('\'') '\'') | (&('"') '"') | (&(')') ')') | (&('(') '(') | (&('\\') '\\') | (&('\r') '\r') | (&('\n') '\n') | (&('\t') '\t') | (&(' ') ' ')) .)+>> */ - nil, - /* 3 value <- <<('\'' (!('"' / '\'') .)* '\'')>> */ - func() bool { - position72, tokenIndex72 := position, tokenIndex - { - position73 := position - { - position74 := position - if buffer[position] != rune('\'') { - goto l72 - } - position++ - l75: - { - position76, tokenIndex76 := position, tokenIndex - { - position77, tokenIndex77 := position, tokenIndex - { - position78, tokenIndex78 := position, tokenIndex - if buffer[position] != rune('"') { - goto l79 - } - position++ - goto l78 - l79: - position, tokenIndex = position78, tokenIndex78 - if buffer[position] != rune('\'') { - goto l77 - } - position++ - } - l78: - goto l76 - l77: - position, tokenIndex = position77, tokenIndex77 - } - if !matchDot() { - goto l76 - } - goto l75 - l76: - position, tokenIndex = position76, tokenIndex76 - } - if buffer[position] != rune('\'') { - goto l72 - } - position++ - add(rulePegText, position74) - } - add(rulevalue, position73) - } - return true - l72: - position, tokenIndex = position72, tokenIndex72 - return false - }, - /* 4 number <- <<('0' / ([1-9] digit* ('.' digit*)?))>> */ - func() bool { - position80, tokenIndex80 := position, tokenIndex - { - position81 := position - { - position82 := position - { - position83, tokenIndex83 := position, tokenIndex - if buffer[position] != rune('0') { - goto l84 - } - position++ - goto l83 - l84: - position, tokenIndex = position83, tokenIndex83 - if c := buffer[position]; c < rune('1') || c > rune('9') { - goto l80 - } - position++ - l85: - { - position86, tokenIndex86 := position, tokenIndex - if !_rules[ruledigit]() { - goto l86 - } - goto l85 - l86: - position, tokenIndex = position86, tokenIndex86 - } - { - position87, tokenIndex87 := position, tokenIndex - if buffer[position] != rune('.') { - goto l87 - } - position++ - l89: - { - position90, tokenIndex90 := position, tokenIndex - if !_rules[ruledigit]() { - goto l90 - } - goto l89 - l90: - position, tokenIndex = position90, tokenIndex90 - } - goto l88 - l87: - position, tokenIndex = position87, tokenIndex87 - } - l88: - } - l83: - add(rulePegText, position82) - } - add(rulenumber, position81) - } - return true - l80: - position, tokenIndex = position80, tokenIndex80 - return false - }, - /* 5 digit <- <[0-9]> */ - func() bool { - position91, tokenIndex91 := position, tokenIndex - { - position92 := position - if c := buffer[position]; c < rune('0') || c > rune('9') { - goto l91 - } - position++ - add(ruledigit, position92) - } - return true - l91: - position, tokenIndex = position91, tokenIndex91 - return false - }, - /* 6 time <- <(('t' / 'T') ('i' / 'I') ('m' / 'M') ('e' / 'E') ' ' <(year '-' month '-' day 'T' digit digit ':' digit digit ':' digit digit ((('-' / '+') digit digit ':' digit digit) / 'Z'))>)> */ - func() bool { - position93, tokenIndex93 := position, tokenIndex - { - position94 := position - { - position95, tokenIndex95 := position, tokenIndex - if buffer[position] != rune('t') { - goto l96 - } - position++ - goto l95 - l96: - position, tokenIndex = position95, tokenIndex95 - if buffer[position] != rune('T') { - goto l93 - } - position++ - } - l95: - { - position97, tokenIndex97 := position, tokenIndex - if buffer[position] != rune('i') { - goto l98 - } - position++ - goto l97 - l98: - position, tokenIndex = position97, tokenIndex97 - if buffer[position] != rune('I') { - goto l93 - } - position++ - } - l97: - { - position99, tokenIndex99 := position, tokenIndex - if buffer[position] != rune('m') { - goto l100 - } - position++ - goto l99 - l100: - position, tokenIndex = position99, tokenIndex99 - if buffer[position] != rune('M') { - goto l93 - } - position++ - } - l99: - { - position101, tokenIndex101 := position, tokenIndex - if buffer[position] != rune('e') { - goto l102 - } - position++ - goto l101 - l102: - position, tokenIndex = position101, tokenIndex101 - if buffer[position] != rune('E') { - goto l93 - } - position++ - } - l101: - if buffer[position] != rune(' ') { - goto l93 - } - position++ - { - position103 := position - if !_rules[ruleyear]() { - goto l93 - } - if buffer[position] != rune('-') { - goto l93 - } - position++ - if !_rules[rulemonth]() { - goto l93 - } - if buffer[position] != rune('-') { - goto l93 - } - position++ - if !_rules[ruleday]() { - goto l93 - } - if buffer[position] != rune('T') { - goto l93 - } - position++ - if !_rules[ruledigit]() { - goto l93 - } - if !_rules[ruledigit]() { - goto l93 - } - if buffer[position] != rune(':') { - goto l93 - } - position++ - if !_rules[ruledigit]() { - goto l93 - } - if !_rules[ruledigit]() { - goto l93 - } - if buffer[position] != rune(':') { - goto l93 - } - position++ - if !_rules[ruledigit]() { - goto l93 - } - if !_rules[ruledigit]() { - goto l93 - } - { - position104, tokenIndex104 := position, tokenIndex - { - position106, tokenIndex106 := position, tokenIndex - if buffer[position] != rune('-') { - goto l107 - } - position++ - goto l106 - l107: - position, tokenIndex = position106, tokenIndex106 - if buffer[position] != rune('+') { - goto l105 - } - position++ - } - l106: - if !_rules[ruledigit]() { - goto l105 - } - if !_rules[ruledigit]() { - goto l105 - } - if buffer[position] != rune(':') { - goto l105 - } - position++ - if !_rules[ruledigit]() { - goto l105 - } - if !_rules[ruledigit]() { - goto l105 - } - goto l104 - l105: - position, tokenIndex = position104, tokenIndex104 - if buffer[position] != rune('Z') { - goto l93 - } - position++ - } - l104: - add(rulePegText, position103) - } - add(ruletime, position94) - } - return true - l93: - position, tokenIndex = position93, tokenIndex93 - return false - }, - /* 7 date <- <(('d' / 'D') ('a' / 'A') ('t' / 'T') ('e' / 'E') ' ' <(year '-' month '-' day)>)> */ - func() bool { - position108, tokenIndex108 := position, tokenIndex - { - position109 := position - { - position110, tokenIndex110 := position, tokenIndex - if buffer[position] != rune('d') { - goto l111 - } - position++ - goto l110 - l111: - position, tokenIndex = position110, tokenIndex110 - if buffer[position] != rune('D') { - goto l108 - } - position++ - } - l110: - { - position112, tokenIndex112 := position, tokenIndex - if buffer[position] != rune('a') { - goto l113 - } - position++ - goto l112 - l113: - position, tokenIndex = position112, tokenIndex112 - if buffer[position] != rune('A') { - goto l108 - } - position++ - } - l112: - { - position114, tokenIndex114 := position, tokenIndex - if buffer[position] != rune('t') { - goto l115 - } - position++ - goto l114 - l115: - position, tokenIndex = position114, tokenIndex114 - if buffer[position] != rune('T') { - goto l108 - } - position++ - } - l114: - { - position116, tokenIndex116 := position, tokenIndex - if buffer[position] != rune('e') { - goto l117 - } - position++ - goto l116 - l117: - position, tokenIndex = position116, tokenIndex116 - if buffer[position] != rune('E') { - goto l108 - } - position++ - } - l116: - if buffer[position] != rune(' ') { - goto l108 - } - position++ - { - position118 := position - if !_rules[ruleyear]() { - goto l108 - } - if buffer[position] != rune('-') { - goto l108 - } - position++ - if !_rules[rulemonth]() { - goto l108 - } - if buffer[position] != rune('-') { - goto l108 - } - position++ - if !_rules[ruleday]() { - goto l108 - } - add(rulePegText, position118) - } - add(ruledate, position109) - } - return true - l108: - position, tokenIndex = position108, tokenIndex108 - return false - }, - /* 8 year <- <(('1' / '2') digit digit digit)> */ - func() bool { - position119, tokenIndex119 := position, tokenIndex - { - position120 := position - { - position121, tokenIndex121 := position, tokenIndex - if buffer[position] != rune('1') { - goto l122 - } - position++ - goto l121 - l122: - position, tokenIndex = position121, tokenIndex121 - if buffer[position] != rune('2') { - goto l119 - } - position++ - } - l121: - if !_rules[ruledigit]() { - goto l119 - } - if !_rules[ruledigit]() { - goto l119 - } - if !_rules[ruledigit]() { - goto l119 - } - add(ruleyear, position120) - } - return true - l119: - position, tokenIndex = position119, tokenIndex119 - return false - }, - /* 9 month <- <(('0' / '1') digit)> */ - func() bool { - position123, tokenIndex123 := position, tokenIndex - { - position124 := position - { - position125, tokenIndex125 := position, tokenIndex - if buffer[position] != rune('0') { - goto l126 - } - position++ - goto l125 - l126: - position, tokenIndex = position125, tokenIndex125 - if buffer[position] != rune('1') { - goto l123 - } - position++ - } - l125: - if !_rules[ruledigit]() { - goto l123 - } - add(rulemonth, position124) - } - return true - l123: - position, tokenIndex = position123, tokenIndex123 - return false - }, - /* 10 day <- <(((&('3') '3') | (&('2') '2') | (&('1') '1') | (&('0') '0')) digit)> */ - func() bool { - position127, tokenIndex127 := position, tokenIndex - { - position128 := position - { - switch buffer[position] { - case '3': - if buffer[position] != rune('3') { - goto l127 - } - position++ - break - case '2': - if buffer[position] != rune('2') { - goto l127 - } - position++ - break - case '1': - if buffer[position] != rune('1') { - goto l127 - } - position++ - break - default: - if buffer[position] != rune('0') { - goto l127 - } - position++ - break - } - } - - if !_rules[ruledigit]() { - goto l127 - } - add(ruleday, position128) - } - return true - l127: - position, tokenIndex = position127, tokenIndex127 - return false - }, - /* 11 and <- <(('a' / 'A') ('n' / 'N') ('d' / 'D'))> */ - nil, - /* 12 equal <- <'='> */ - nil, - /* 13 contains <- <(('c' / 'C') ('o' / 'O') ('n' / 'N') ('t' / 'T') ('a' / 'A') ('i' / 'I') ('n' / 'N') ('s' / 'S'))> */ - nil, - /* 14 le <- <('<' '=')> */ - nil, - /* 15 ge <- <('>' '=')> */ - nil, - /* 16 l <- <'<'> */ - nil, - /* 17 g <- <'>'> */ - nil, - nil, - } - p.rules = _rules -} diff --git a/pubsub/query/query_test.go b/pubsub/query/query_test.go deleted file mode 100644 index 7d3ac6ba..00000000 --- a/pubsub/query/query_test.go +++ /dev/null @@ -1,86 +0,0 @@ -package query_test - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/tendermint/tmlibs/pubsub" - "github.com/tendermint/tmlibs/pubsub/query" -) - -func TestMatches(t *testing.T) { - const shortForm = "2006-Jan-02" - txDate, err := time.Parse(shortForm, "2017-Jan-01") - require.NoError(t, err) - txTime, err := time.Parse(time.RFC3339, "2018-05-03T14:45:00Z") - require.NoError(t, err) - - testCases := []struct { - s string - tags map[string]interface{} - err bool - matches bool - }{ - {"tm.events.type='NewBlock'", map[string]interface{}{"tm.events.type": "NewBlock"}, false, true}, - - {"tx.gas > 7", map[string]interface{}{"tx.gas": 8}, false, true}, - {"tx.gas > 7 AND tx.gas < 9", map[string]interface{}{"tx.gas": 8}, false, true}, - {"body.weight >= 3.5", map[string]interface{}{"body.weight": 3.5}, false, true}, - {"account.balance < 1000.0", map[string]interface{}{"account.balance": 900}, false, true}, - {"apples.kg <= 4", map[string]interface{}{"apples.kg": 4.0}, false, true}, - {"body.weight >= 4.5", map[string]interface{}{"body.weight": float32(4.5)}, false, true}, - {"oranges.kg < 4 AND watermellons.kg > 10", map[string]interface{}{"oranges.kg": 3, "watermellons.kg": 12}, false, true}, - {"peaches.kg < 4", map[string]interface{}{"peaches.kg": 5}, false, false}, - - {"tx.date > DATE 2017-01-01", map[string]interface{}{"tx.date": time.Now()}, false, true}, - {"tx.date = DATE 2017-01-01", map[string]interface{}{"tx.date": txDate}, false, true}, - {"tx.date = DATE 2018-01-01", map[string]interface{}{"tx.date": txDate}, false, false}, - - {"tx.time >= TIME 2013-05-03T14:45:00Z", map[string]interface{}{"tx.time": time.Now()}, false, true}, - {"tx.time = TIME 2013-05-03T14:45:00Z", map[string]interface{}{"tx.time": txTime}, false, false}, - - {"abci.owner.name CONTAINS 'Igor'", map[string]interface{}{"abci.owner.name": "Igor,Ivan"}, false, true}, - {"abci.owner.name CONTAINS 'Igor'", map[string]interface{}{"abci.owner.name": "Pavel,Ivan"}, false, false}, - } - - for _, tc := range testCases { - q, err := query.New(tc.s) - if !tc.err { - require.Nil(t, err) - } - - if tc.matches { - assert.True(t, q.Matches(pubsub.NewTagMap(tc.tags)), "Query '%s' should match %v", tc.s, tc.tags) - } else { - assert.False(t, q.Matches(pubsub.NewTagMap(tc.tags)), "Query '%s' should not match %v", tc.s, tc.tags) - } - } -} - -func TestMustParse(t *testing.T) { - assert.Panics(t, func() { query.MustParse("=") }) - assert.NotPanics(t, func() { query.MustParse("tm.events.type='NewBlock'") }) -} - -func TestConditions(t *testing.T) { - txTime, err := time.Parse(time.RFC3339, "2013-05-03T14:45:00Z") - require.NoError(t, err) - - testCases := []struct { - s string - conditions []query.Condition - }{ - {s: "tm.events.type='NewBlock'", conditions: []query.Condition{query.Condition{Tag: "tm.events.type", Op: query.OpEqual, Operand: "NewBlock"}}}, - {s: "tx.gas > 7 AND tx.gas < 9", conditions: []query.Condition{query.Condition{Tag: "tx.gas", Op: query.OpGreater, Operand: int64(7)}, query.Condition{Tag: "tx.gas", Op: query.OpLess, Operand: int64(9)}}}, - {s: "tx.time >= TIME 2013-05-03T14:45:00Z", conditions: []query.Condition{query.Condition{Tag: "tx.time", Op: query.OpGreaterEqual, Operand: txTime}}}, - } - - for _, tc := range testCases { - q, err := query.New(tc.s) - require.Nil(t, err) - - assert.Equal(t, tc.conditions, q.Conditions()) - } -}