diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 34df86a4..f5df418a 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -9,8 +9,6 @@ // When some message is published, we match it with all queries. If there is a // match, this message will be pushed to all clients, subscribed to that query. // See query subpackage for our implementation. -// -// Subscribe/Unsubscribe calls are always blocking. package pubsub import ( @@ -42,7 +40,7 @@ type Query interface { Matches(tags map[string]interface{}) bool } -// Server allows clients to subscribe/unsubscribe for messages, pubsling +// Server allows clients to subscribe/unsubscribe for messages, publishing // messages with or without tags, and manages internal state. type Server struct { cmn.BaseService @@ -83,15 +81,15 @@ func BufferCapacity(cap int) Option { } } -// Returns capacity of the internal server's queue. +// BufferCapacity returns capacity of the internal server's queue. func (s Server) BufferCapacity() int { return s.cmdsCap } -// Subscribe returns a channel on which messages matching the given query can -// be received. If the subscription already exists old channel will be closed -// and new one returned. Error will be returned to the caller if the context is -// cancelled. +// Subscribe creates a subscription for the given client. It accepts a channel +// on which messages matching the given query can be received. If the +// subscription already exists, the old channel will be closed. An error will +// be returned to the caller if the context is canceled. func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, out chan<- interface{}) error { select { case s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}: @@ -101,8 +99,8 @@ func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, ou } } -// Unsubscribe unsubscribes the given client from the query. Error will be -// returned to the caller if the context is cancelled. +// Unsubscribe removes the subscription on the given query. An error will be +// returned to the caller if the context is canceled. func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) error { select { case s.cmds <- cmd{op: unsub, clientID: clientID, query: query}: @@ -112,7 +110,8 @@ func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) } } -// Unsubscribe unsubscribes the given channel. Blocking. +// Unsubscribe removes all client subscriptions. An error will be returned to +// the caller if the context is canceled. func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error { select { case s.cmds <- cmd{op: unsub, clientID: clientID}: @@ -122,14 +121,15 @@ func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error { } } -// Publish publishes the given message. Blocking. +// Publish publishes the given message. An error will be returned to the caller +// if the context is canceled. func (s *Server) Publish(ctx context.Context, msg interface{}) error { return s.PublishWithTags(ctx, msg, make(map[string]interface{})) } -// PublishWithTags publishes the given message with a set of tags. This set of -// tags will be matched with client queries. If there is a match, the message -// will be sent to a client. Blocking. +// PublishWithTags publishes the given message with the set of tags. The set is +// matched with clients queries. If there is a match, the message is sent to +// the client. func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags map[string]interface{}) error { select { case s.cmds <- cmd{op: pub, msg: msg, tags: tags}: @@ -152,7 +152,7 @@ type state struct { clients map[string]map[Query]struct{} } -// OnStart implements Service.OnStart by creating a main loop. +// OnStart implements Service.OnStart by starting the server. func (s *Server) OnStart() error { go s.loop(state{ queries: make(map[Query]map[string]chan<- interface{}), @@ -194,6 +194,8 @@ func (state *state) add(clientID string, q Query, ch chan<- interface{}) { close(oldCh) } } + + // create subscription state.queries[q][clientID] = ch // add client if needed @@ -201,10 +203,6 @@ func (state *state) add(clientID string, q Query, ch chan<- interface{}) { state.clients[clientID] = make(map[Query]struct{}) } state.clients[clientID][q] = struct{}{} - - // create subscription - clientToChannelMap := state.queries[q] - clientToChannelMap[clientID] = ch } func (state *state) remove(clientID string, q Query) { diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 9c984144..9d003cff 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -153,6 +153,13 @@ func TestBufferCapacity(t *testing.T) { require.NoError(t, err) err = s.Publish(ctx, "Sage") require.NoError(t, err) + + ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + defer cancel() + err = s.Publish(ctx, "Ironclad") + if assert.Error(t, err) { + assert.Equal(t, context.DeadlineExceeded, err) + } } func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) }