return ErrorOverflow on Subscribe if server is overflowed
> why we need it? most of our subscribers will be RPC WS subscribers, so if there are too many, nothing wrong with rejecting to subscribe. however, consensus reactor must be the first to subscribe, since its work depends on the pubsub package.
This commit is contained in:
parent
13207a5927
commit
0006bfc359
|
@ -17,8 +17,8 @@ func TestExample(t *testing.T) {
|
||||||
defer s.Stop()
|
defer s.Stop()
|
||||||
|
|
||||||
ch := make(chan interface{}, 1)
|
ch := make(chan interface{}, 1)
|
||||||
s.Subscribe("example-client", query.MustParse("abci.account.name=John"), ch)
|
err := s.Subscribe("example-client", query.MustParse("abci.account.name=John"), ch)
|
||||||
err := s.PublishWithTags("Tombstone", map[string]interface{}{"abci.account.name": "John"})
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
s.PublishWithTags("Tombstone", map[string]interface{}{"abci.account.name": "John"})
|
||||||
assertReceive(t, "Tombstone", ch)
|
assertReceive(t, "Tombstone", ch)
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,7 @@ package pubsub
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"time"
|
||||||
|
|
||||||
cmn "github.com/tendermint/tmlibs/common"
|
cmn "github.com/tendermint/tmlibs/common"
|
||||||
)
|
)
|
||||||
|
@ -28,8 +29,10 @@ const (
|
||||||
shutdown
|
shutdown
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const subscribeTimeout = 10 * time.Millisecond
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrorOverflow = errors.New("Server overflowed")
|
ErrorOverflow = errors.New("server overflowed")
|
||||||
)
|
)
|
||||||
|
|
||||||
type cmd struct {
|
type cmd struct {
|
||||||
|
@ -52,6 +55,7 @@ type Server struct {
|
||||||
cmn.BaseService
|
cmn.BaseService
|
||||||
|
|
||||||
cmds chan cmd
|
cmds chan cmd
|
||||||
|
cmdsCap int
|
||||||
}
|
}
|
||||||
|
|
||||||
// Option sets a parameter for the server.
|
// Option sets a parameter for the server.
|
||||||
|
@ -68,9 +72,8 @@ func NewServer(options ...Option) *Server {
|
||||||
option(s)
|
option(s)
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.cmds == nil { // if BufferCapacity was not set, create unbuffered channel
|
// if BufferCapacity option was not set, the channel is unbuffered
|
||||||
s.cmds = make(chan cmd)
|
s.cmds = make(chan cmd, s.cmdsCap)
|
||||||
}
|
|
||||||
|
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
@ -82,40 +85,49 @@ func NewServer(options ...Option) *Server {
|
||||||
func BufferCapacity(cap int) Option {
|
func BufferCapacity(cap int) Option {
|
||||||
return func(s *Server) {
|
return func(s *Server) {
|
||||||
if cap > 0 {
|
if cap > 0 {
|
||||||
s.cmds = make(chan cmd, cap)
|
s.cmdsCap = cap
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Returns capacity of the internal server's queue.
|
||||||
|
func (s Server) BufferCapacity() int {
|
||||||
|
return s.cmdsCap
|
||||||
|
}
|
||||||
|
|
||||||
// Subscribe returns a channel on which messages matching the given query can
|
// Subscribe returns a channel on which messages matching the given query can
|
||||||
// be received. If the subscription already exists old channel will be closed
|
// be received. If the subscription already exists old channel will be closed
|
||||||
// and new one returned.
|
// and new one returned. Error will be returned to the caller if the server is
|
||||||
func (s *Server) Subscribe(clientID string, query Query, out chan<- interface{}) {
|
// overflowed.
|
||||||
s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}
|
func (s *Server) Subscribe(clientID string, query Query, out chan<- interface{}) error {
|
||||||
|
select {
|
||||||
|
case s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}:
|
||||||
|
return nil
|
||||||
|
case <-time.After(subscribeTimeout):
|
||||||
|
return ErrorOverflow
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unsubscribe unsubscribes the given client from the query.
|
// Unsubscribe unsubscribes the given client from the query. Blocking.
|
||||||
func (s *Server) Unsubscribe(clientID string, query Query) {
|
func (s *Server) Unsubscribe(clientID string, query Query) {
|
||||||
s.cmds <- cmd{op: unsub, clientID: clientID, query: query}
|
s.cmds <- cmd{op: unsub, clientID: clientID, query: query}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unsubscribe unsubscribes the given channel.
|
// Unsubscribe unsubscribes the given channel. Blocking.
|
||||||
func (s *Server) UnsubscribeAll(clientID string) {
|
func (s *Server) UnsubscribeAll(clientID string) {
|
||||||
s.cmds <- cmd{op: unsub, clientID: clientID}
|
s.cmds <- cmd{op: unsub, clientID: clientID}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Publish publishes the given message.
|
// Publish publishes the given message. Blocking.
|
||||||
func (s *Server) Publish(msg interface{}) error {
|
func (s *Server) Publish(msg interface{}) {
|
||||||
return s.PublishWithTags(msg, make(map[string]interface{}))
|
s.PublishWithTags(msg, make(map[string]interface{}))
|
||||||
}
|
}
|
||||||
|
|
||||||
// PublishWithTags publishes the given message with a set of tags. This set of
|
// PublishWithTags publishes the given message with a set of tags. This set of
|
||||||
// tags will be matched with client queries. If there is a match, the message
|
// tags will be matched with client queries. If there is a match, the message
|
||||||
// will be sent to a client.
|
// will be sent to a client. Blocking.
|
||||||
func (s *Server) PublishWithTags(msg interface{}, tags map[string]interface{}) error {
|
func (s *Server) PublishWithTags(msg interface{}, tags map[string]interface{}) {
|
||||||
pubCmd := cmd{op: pub, msg: msg, tags: tags}
|
s.cmds <- cmd{op: pub, msg: msg, tags: tags}
|
||||||
s.cmds <- pubCmd
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnStop implements Service.OnStop by shutting down the server.
|
// OnStop implements Service.OnStop by shutting down the server.
|
||||||
|
|
|
@ -25,13 +25,12 @@ func TestSubscribe(t *testing.T) {
|
||||||
defer s.Stop()
|
defer s.Stop()
|
||||||
|
|
||||||
ch := make(chan interface{}, 1)
|
ch := make(chan interface{}, 1)
|
||||||
s.Subscribe(clientID, query.Empty{}, ch)
|
err := s.Subscribe(clientID, query.Empty{}, ch)
|
||||||
err := s.Publish("Ka-Zar")
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
s.Publish("Ka-Zar")
|
||||||
assertReceive(t, "Ka-Zar", ch)
|
assertReceive(t, "Ka-Zar", ch)
|
||||||
|
|
||||||
err = s.Publish("Quicksilver")
|
s.Publish("Quicksilver")
|
||||||
require.NoError(t, err)
|
|
||||||
assertReceive(t, "Quicksilver", ch)
|
assertReceive(t, "Quicksilver", ch)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,22 +40,22 @@ func TestDifferentClients(t *testing.T) {
|
||||||
s.Start()
|
s.Start()
|
||||||
defer s.Stop()
|
defer s.Stop()
|
||||||
ch1 := make(chan interface{}, 1)
|
ch1 := make(chan interface{}, 1)
|
||||||
s.Subscribe("client-1", query.MustParse("tm.events.type=NewBlock"), ch1)
|
err := s.Subscribe("client-1", query.MustParse("tm.events.type=NewBlock"), ch1)
|
||||||
err := s.PublishWithTags("Iceman", map[string]interface{}{"tm.events.type": "NewBlock"})
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
s.PublishWithTags("Iceman", map[string]interface{}{"tm.events.type": "NewBlock"})
|
||||||
assertReceive(t, "Iceman", ch1)
|
assertReceive(t, "Iceman", ch1)
|
||||||
|
|
||||||
ch2 := make(chan interface{}, 1)
|
ch2 := make(chan interface{}, 1)
|
||||||
s.Subscribe("client-2", query.MustParse("tm.events.type=NewBlock AND abci.account.name=Igor"), ch2)
|
err = s.Subscribe("client-2", query.MustParse("tm.events.type=NewBlock AND abci.account.name=Igor"), ch2)
|
||||||
err = s.PublishWithTags("Ultimo", map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"})
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
s.PublishWithTags("Ultimo", map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"})
|
||||||
assertReceive(t, "Ultimo", ch1)
|
assertReceive(t, "Ultimo", ch1)
|
||||||
assertReceive(t, "Ultimo", ch2)
|
assertReceive(t, "Ultimo", ch2)
|
||||||
|
|
||||||
ch3 := make(chan interface{}, 1)
|
ch3 := make(chan interface{}, 1)
|
||||||
s.Subscribe("client-3", query.MustParse("tm.events.type=NewRoundStep AND abci.account.name=Igor AND abci.invoice.number = 10"), ch3)
|
err = s.Subscribe("client-3", query.MustParse("tm.events.type=NewRoundStep AND abci.account.name=Igor AND abci.invoice.number = 10"), ch3)
|
||||||
err = s.PublishWithTags("Valeria Richards", map[string]interface{}{"tm.events.type": "NewRoundStep"})
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
s.PublishWithTags("Valeria Richards", map[string]interface{}{"tm.events.type": "NewRoundStep"})
|
||||||
assert.Zero(t, len(ch3))
|
assert.Zero(t, len(ch3))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -69,19 +68,19 @@ func TestClientResubscribes(t *testing.T) {
|
||||||
q := query.MustParse("tm.events.type=NewBlock")
|
q := query.MustParse("tm.events.type=NewBlock")
|
||||||
|
|
||||||
ch1 := make(chan interface{}, 1)
|
ch1 := make(chan interface{}, 1)
|
||||||
s.Subscribe(clientID, q, ch1)
|
err := s.Subscribe(clientID, q, ch1)
|
||||||
err := s.PublishWithTags("Goblin Queen", map[string]interface{}{"tm.events.type": "NewBlock"})
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
s.PublishWithTags("Goblin Queen", map[string]interface{}{"tm.events.type": "NewBlock"})
|
||||||
assertReceive(t, "Goblin Queen", ch1)
|
assertReceive(t, "Goblin Queen", ch1)
|
||||||
|
|
||||||
ch2 := make(chan interface{}, 1)
|
ch2 := make(chan interface{}, 1)
|
||||||
s.Subscribe(clientID, q, ch2)
|
err = s.Subscribe(clientID, q, ch2)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
_, ok := <-ch1
|
_, ok := <-ch1
|
||||||
assert.False(t, ok)
|
assert.False(t, ok)
|
||||||
|
|
||||||
err = s.PublishWithTags("Spider-Man", map[string]interface{}{"tm.events.type": "NewBlock"})
|
s.PublishWithTags("Spider-Man", map[string]interface{}{"tm.events.type": "NewBlock"})
|
||||||
require.NoError(t, err)
|
|
||||||
assertReceive(t, "Spider-Man", ch2)
|
assertReceive(t, "Spider-Man", ch2)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -92,11 +91,11 @@ func TestUnsubscribe(t *testing.T) {
|
||||||
defer s.Stop()
|
defer s.Stop()
|
||||||
|
|
||||||
ch := make(chan interface{})
|
ch := make(chan interface{})
|
||||||
s.Subscribe(clientID, query.Empty{}, ch)
|
err := s.Subscribe(clientID, query.Empty{}, ch)
|
||||||
|
require.NoError(t, err)
|
||||||
s.Unsubscribe(clientID, query.Empty{})
|
s.Unsubscribe(clientID, query.Empty{})
|
||||||
|
|
||||||
err := s.Publish("Nick Fury")
|
s.Publish("Nick Fury")
|
||||||
require.NoError(t, err)
|
|
||||||
assert.Zero(t, len(ch), "Should not receive anything after Unsubscribe")
|
assert.Zero(t, len(ch), "Should not receive anything after Unsubscribe")
|
||||||
|
|
||||||
_, ok := <-ch
|
_, ok := <-ch
|
||||||
|
@ -110,13 +109,14 @@ func TestUnsubscribeAll(t *testing.T) {
|
||||||
defer s.Stop()
|
defer s.Stop()
|
||||||
|
|
||||||
ch1, ch2 := make(chan interface{}, 1), make(chan interface{}, 1)
|
ch1, ch2 := make(chan interface{}, 1), make(chan interface{}, 1)
|
||||||
s.Subscribe(clientID, query.MustParse("tm.events.type=NewBlock"), ch1)
|
err := s.Subscribe(clientID, query.MustParse("tm.events.type=NewBlock"), ch1)
|
||||||
s.Subscribe(clientID, query.MustParse("tm.events.type=NewBlockHeader"), ch2)
|
require.NoError(t, err)
|
||||||
|
err = s.Subscribe(clientID, query.MustParse("tm.events.type=NewBlockHeader"), ch2)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
s.UnsubscribeAll(clientID)
|
s.UnsubscribeAll(clientID)
|
||||||
|
|
||||||
err := s.Publish("Nick Fury")
|
s.Publish("Nick Fury")
|
||||||
require.NoError(t, err)
|
|
||||||
assert.Zero(t, len(ch1), "Should not receive anything after UnsubscribeAll")
|
assert.Zero(t, len(ch1), "Should not receive anything after UnsubscribeAll")
|
||||||
assert.Zero(t, len(ch2), "Should not receive anything after UnsubscribeAll")
|
assert.Zero(t, len(ch2), "Should not receive anything after UnsubscribeAll")
|
||||||
|
|
||||||
|
@ -130,10 +130,19 @@ func TestBufferCapacity(t *testing.T) {
|
||||||
s := pubsub.NewServer(pubsub.BufferCapacity(2))
|
s := pubsub.NewServer(pubsub.BufferCapacity(2))
|
||||||
s.SetLogger(log.TestingLogger())
|
s.SetLogger(log.TestingLogger())
|
||||||
|
|
||||||
err := s.Publish("Nighthawk")
|
s.Publish("Nighthawk")
|
||||||
require.NoError(t, err)
|
s.Publish("Sage")
|
||||||
err = s.Publish("Sage")
|
}
|
||||||
require.NoError(t, err)
|
|
||||||
|
func TestSubscribeReturnsErrorIfServerOverflowed(t *testing.T) {
|
||||||
|
s := pubsub.NewServer()
|
||||||
|
s.SetLogger(log.TestingLogger())
|
||||||
|
|
||||||
|
ch := make(chan interface{}, 1)
|
||||||
|
err := s.Subscribe(clientID, query.MustParse("tm.events.type=NewBlock"), ch)
|
||||||
|
if assert.Error(t, err) {
|
||||||
|
assert.Equal(t, pubsub.ErrorOverflow, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) }
|
func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) }
|
||||||
|
|
Loading…
Reference in New Issue