fix: handle messages when the client is down (#115)

This commit is contained in:
ftocal 2023-02-01 09:31:15 -03:00 committed by GitHub
parent cd29aaeb4d
commit 7a62307899
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 270 additions and 165 deletions

View File

@ -48,10 +48,12 @@ func main() {
logger.Info("Starting wormhole-explorer-spy ...")
svs := grpc.NewSignedVaaSubscribers()
svs := grpc.NewSignedVaaSubscribers(logger)
avs := grpc.NewAllVaaSubscribers(logger)
go svs.Start(rootCtx)
go avs.Start(rootCtx)
handler := grpc.NewHandler(svs, avs)
handler := grpc.NewHandler(svs, avs, logger)
grpcServer, err := grpc.NewServer(handler, logger, config.GrpcAddress)
if err != nil {

View File

@ -5,6 +5,7 @@ import (
spyv1 "github.com/certusone/wormhole/node/pkg/proto/spy/v1"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@ -14,18 +15,21 @@ type Handler struct {
spyv1.UnimplementedSpyRPCServiceServer
svs *SignedVaaSubscribers
avs *AllVaaSubscribers
logger *zap.Logger
}
// NewHandler creates a new handler of suscriptions.
func NewHandler(svs *SignedVaaSubscribers, avs *AllVaaSubscribers) *Handler {
func NewHandler(svs *SignedVaaSubscribers, avs *AllVaaSubscribers, logger *zap.Logger) *Handler {
return &Handler{
svs: svs,
avs: avs,
logger: logger,
}
}
// SubscribeSignedVAA implements the suscriptions of signed VAA.
func (h *Handler) SubscribeSignedVAA(req *spyv1.SubscribeSignedVAARequest, resp spyv1.SpyRPCService_SubscribeSignedVAAServer) error {
h.logger.Info("Receiving new subscriber in signed VAA")
var fi []filterSignedVaa
if req.Filters != nil {
for _, f := range req.Filters {
@ -33,6 +37,7 @@ func (h *Handler) SubscribeSignedVAA(req *spyv1.SubscribeSignedVAARequest, resp
case *spyv1.FilterEntry_EmitterFilter:
addr, err := vaa.StringToAddress(t.EmitterFilter.EmitterAddress)
if err != nil {
h.logger.Error("Decoding emitter address", zap.Error(err))
return status.Error(codes.InvalidArgument, fmt.Sprintf("failed to decode emitter address: %v", err))
}
fi = append(fi, filterSignedVaa{
@ -40,22 +45,25 @@ func (h *Handler) SubscribeSignedVAA(req *spyv1.SubscribeSignedVAARequest, resp
emitterAddr: addr,
})
default:
h.logger.Error("Unsupported filter type", zap.Any("filter", t))
return status.Error(codes.InvalidArgument, "unsupported filter type")
}
}
}
id, sub := h.svs.Register(fi)
defer h.svs.Unregister(id)
subscriber := h.svs.Register(fi)
defer h.svs.Unregister(subscriber)
for {
select {
case <-resp.Context().Done():
h.logger.Error("Context done", zap.String("id", subscriber.id), zap.Error(resp.Context().Err()))
return resp.Context().Err()
case msg := <-sub.ch:
case msg := <-subscriber.ch:
if err := resp.Send(&spyv1.SubscribeSignedVAAResponse{
VaaBytes: msg.vaaBytes,
}); err != nil {
h.logger.Error("Sending vaas", zap.String("id", subscriber.id), zap.Error(err))
return err
}
}
@ -64,6 +72,7 @@ func (h *Handler) SubscribeSignedVAA(req *spyv1.SubscribeSignedVAARequest, resp
// SubscribeSignedVAAByType implements the suscriptions of signed VAA by type.
func (h *Handler) SubscribeSignedVAAByType(req *spyv1.SubscribeSignedVAAByTypeRequest, resp spyv1.SpyRPCService_SubscribeSignedVAAByTypeServer) error {
h.logger.Info("Receiving new subscriber in signed VAA by type")
var fi []*spyv1.FilterEntry
if req.Filters != nil {
for _, f := range req.Filters {
@ -86,8 +95,8 @@ func (h *Handler) SubscribeSignedVAAByType(req *spyv1.SubscribeSignedVAAByTypeRe
}
}
id, sub := h.avs.Register(fi)
defer h.avs.Unregister(id)
sub := h.avs.Register(fi)
defer h.avs.Unregister(sub)
for {
select {

View File

@ -44,13 +44,19 @@ func createGRPCServer(handler *Handler, logger *zap.Logger) (context.Context, *g
func TestSubscribeSignedVAA_OK(t *testing.T) {
logger := zaptest.NewLogger(t)
svs := NewSignedVaaSubscribers()
svs := NewSignedVaaSubscribers(logger)
avs := NewAllVaaSubscribers(logger)
handler := NewHandler(svs, avs)
handler := NewHandler(svs, avs, logger)
ctx, _, client := createGRPCServer(handler, logger)
_, _, client := createGRPCServer(handler, logger)
t.Run("receive valid vaa", func(t *testing.T) {
doneSvs := make(chan bool)
ctx, cancel := context.WithCancel(context.TODO())
go func(ctx context.Context) {
defer close(doneSvs)
svs.Start(ctx)
}(ctx)
vaa := createVAA(vaa.ChainIDEthereum, emitterAddr)
vaaBytes, _ := vaa.MarshalBinary()
req := &spyv1.SubscribeSignedVAARequest{}
@ -69,14 +75,16 @@ func TestSubscribeSignedVAA_OK(t *testing.T) {
err = svs.HandleVAA(vaaBytes)
assert.Nil(t, err)
<-doneCh
cancel()
<-doneSvs
})
}
func TestSubscribeSignedVAA_Failed(t *testing.T) {
logger := zaptest.NewLogger(t)
svs := NewSignedVaaSubscribers()
svs := NewSignedVaaSubscribers(logger)
avs := NewAllVaaSubscribers(logger)
handler := NewHandler(svs, avs)
handler := NewHandler(svs, avs, logger)
ctx, _, client := createGRPCServer(handler, logger)
@ -120,13 +128,19 @@ func TestSubscribeSignedVAA_Failed(t *testing.T) {
func TestSubscribeSignedVAAByType_OK(t *testing.T) {
logger := zaptest.NewLogger(t)
svs := NewSignedVaaSubscribers()
svs := NewSignedVaaSubscribers(logger)
avs := NewAllVaaSubscribers(logger)
handler := NewHandler(svs, avs)
handler := NewHandler(svs, avs, logger)
ctx, _, client := createGRPCServer(handler, logger)
_, _, client := createGRPCServer(handler, logger)
t.Run("receive valid vaa", func(t *testing.T) {
doneAvs := make(chan bool)
ctx, cancel := context.WithCancel(context.TODO())
go func(ctx context.Context) {
defer close(doneAvs)
avs.Start(ctx)
}(ctx)
vaa := createVAA(vaa.ChainIDEthereum, emitterAddr)
vaaBytes, _ := vaa.MarshalBinary()
req := &spyv1.SubscribeSignedVAAByTypeRequest{}
@ -146,14 +160,16 @@ func TestSubscribeSignedVAAByType_OK(t *testing.T) {
err = avs.HandleVAA(vaaBytes)
assert.Nil(t, err)
<-doneCh
cancel()
<-doneAvs
})
}
func TestSubscribeSignedVAAByType_Failed(t *testing.T) {
logger := zaptest.NewLogger(t)
svs := NewSignedVaaSubscribers()
svs := NewSignedVaaSubscribers(logger)
avs := NewAllVaaSubscribers(logger)
handler := NewHandler(svs, avs)
handler := NewHandler(svs, avs, logger)
ctx, _, client := createGRPCServer(handler, logger)
@ -178,25 +194,21 @@ func TestSubscribeSignedVAAByType_Failed(t *testing.T) {
}
func waitForSignedSubscription(handler *Handler) {
for {
handler.svs.m.Lock()
tk := time.NewTicker(time.Millisecond * 100)
for range tk.C {
subs := len(handler.svs.subscribers)
handler.svs.m.Unlock()
if subs > 0 {
return
}
time.Sleep(time.Millisecond * 10)
}
}
func waitForSignedVAAByTypeSubscription(handler *Handler) {
for {
handler.avs.m.Lock()
tk := time.NewTicker(time.Millisecond * 100)
for range tk.C {
subs := len(handler.avs.subscribers)
handler.avs.m.Unlock()
if subs > 0 {
return
}
time.Sleep(time.Millisecond * 10)
}
}

View File

@ -1,8 +1,8 @@
package grpc
import (
"context"
"fmt"
"sync"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
spyv1 "github.com/certusone/wormhole/node/pkg/proto/spy/v1"
@ -20,10 +20,12 @@ type filterSignedVaa struct {
emitterAddr vaa.Address
}
type subscriptionSignedVaa struct {
id string
filters []filterSignedVaa
ch chan message
}
type subscriptionAllVaa struct {
id string
filters []*spyv1.FilterEntry
ch chan *spyv1.SubscribeSignedVAAByTypeResponse
}
@ -34,57 +36,102 @@ func subscriptionId() string {
// SignedVaaSubscribers represents signed VAA subscribers.
type SignedVaaSubscribers struct {
m sync.Mutex
source chan []byte
subscribers map[string]*subscriptionSignedVaa
addSubscriber chan *subscriptionSignedVaa
removeSubscriber chan *subscriptionSignedVaa
logger *zap.Logger
}
// NewSignedVaaSubscribers creates a signed VAA subscribers.
func NewSignedVaaSubscribers() *SignedVaaSubscribers {
return &SignedVaaSubscribers{subscribers: make(map[string]*subscriptionSignedVaa)}
func NewSignedVaaSubscribers(logger *zap.Logger) *SignedVaaSubscribers {
return &SignedVaaSubscribers{
subscribers: make(map[string]*subscriptionSignedVaa),
addSubscriber: make(chan *subscriptionSignedVaa, 1),
removeSubscriber: make(chan *subscriptionSignedVaa, 1),
source: make(chan []byte, 1),
logger: logger,
}
}
// AllVaaSubscribers represents all VAA subscribers.
type AllVaaSubscribers struct {
m sync.Mutex
source chan []byte
subscribers map[string]*subscriptionAllVaa
addSubscriber chan *subscriptionAllVaa
removeSubscriber chan *subscriptionAllVaa
logger *zap.Logger
}
// NewAllVaaSubscribers creates all VAA subscribers.
func NewAllVaaSubscribers(logger *zap.Logger) *AllVaaSubscribers {
return &AllVaaSubscribers{subscribers: make(map[string]*subscriptionAllVaa), logger: logger}
return &AllVaaSubscribers{
subscribers: make(map[string]*subscriptionAllVaa),
addSubscriber: make(chan *subscriptionAllVaa, 1),
removeSubscriber: make(chan *subscriptionAllVaa, 1),
source: make(chan []byte, 1),
logger: logger,
}
}
// Register registers a new subscriber with a list of filters.
func (s *SignedVaaSubscribers) Register(fi []filterSignedVaa) (string, *subscriptionSignedVaa) {
s.m.Lock()
id := subscriptionId()
func (s *SignedVaaSubscribers) Register(fi []filterSignedVaa) *subscriptionSignedVaa {
sub := &subscriptionSignedVaa{
id: subscriptionId(),
ch: make(chan message, 1),
filters: fi,
}
s.subscribers[id] = sub
s.m.Unlock()
return id, sub
s.logger.Info("Registering subscriber in signed VAAs ...", zap.String("id", sub.id))
s.addSubscriber <- sub
return sub
}
// Unregister removes a subscriber.
func (s *SignedVaaSubscribers) Unregister(id string) {
s.m.Lock()
defer s.m.Unlock()
delete(s.subscribers, id)
func (s *SignedVaaSubscribers) Unregister(sub *subscriptionSignedVaa) {
s.logger.Info("Unregistering subscriber in signed VAAs ...", zap.String("id", sub.id))
s.removeSubscriber <- sub
}
// HandleVAA sends a VAA to subscribers that filters apply the conditions.
func (s *SignedVaaSubscribers) HandleVAA(vaas []byte) error {
s.m.Lock()
defer s.m.Unlock()
s.source <- vaas
return nil
}
func (s *SignedVaaSubscribers) Start(ctx context.Context) {
defer func() {
for _, subscriberByID := range s.subscribers {
if subscriberByID != nil {
close(subscriberByID.ch)
}
}
}()
for {
select {
case <-ctx.Done():
return
case newSubscriber := <-s.addSubscriber:
s.subscribers[newSubscriber.id] = newSubscriber
s.logger.Info("New subscriber registered in signed VAAs", zap.String("id", newSubscriber.id))
case subscriberToRemove := <-s.removeSubscriber:
if subscriber, exists := s.subscribers[subscriberToRemove.id]; exists {
close(subscriber.ch)
delete(s.subscribers, subscriberToRemove.id)
s.logger.Info("Subscriber unregistered in signed VAAs", zap.String("id", subscriber.id))
}
case vaas, ok := <-s.source:
if !ok {
break
}
var v *vaa.VAA
for _, sub := range s.subscribers {
if len(sub.filters) == 0 {
sub.ch <- message{vaaBytes: vaas}
select {
case sub.ch <- message{vaaBytes: vaas}:
default:
}
continue
}
@ -92,47 +139,79 @@ func (s *SignedVaaSubscribers) HandleVAA(vaas []byte) error {
var err error
v, err = vaa.Unmarshal(vaas)
if err != nil {
return err
s.logger.Error("Unmarshal vaa in signed VAAs", zap.Error(err))
break
}
}
for _, fi := range sub.filters {
if fi.chainId == v.EmitterChain && fi.emitterAddr == v.EmitterAddress {
sub.ch <- message{vaaBytes: vaas}
select {
case sub.ch <- message{vaaBytes: vaas}:
default:
}
}
}
}
return nil
}
}
}
// Register registers a new subscriber with a list of filters.
func (s *AllVaaSubscribers) Register(fi []*spyv1.FilterEntry) (string, *subscriptionAllVaa) {
s.m.Lock()
id := subscriptionId()
func (s *AllVaaSubscribers) Register(fi []*spyv1.FilterEntry) *subscriptionAllVaa {
sub := &subscriptionAllVaa{
id: subscriptionId(),
ch: make(chan *spyv1.SubscribeSignedVAAByTypeResponse, 1),
filters: fi,
}
s.subscribers[id] = sub
s.m.Unlock()
return id, sub
s.logger.Info("Registering subscriber in all VAAs ...", zap.String("id", sub.id))
s.addSubscriber <- sub
return sub
}
// Unregister removes a subscriber.
func (s *AllVaaSubscribers) Unregister(id string) {
s.m.Lock()
defer s.m.Unlock()
delete(s.subscribers, id)
func (s *AllVaaSubscribers) Unregister(sub *subscriptionAllVaa) {
s.logger.Info("Unregistering subscriber in all VAAs ...", zap.String("id", sub.id))
s.removeSubscriber <- sub
}
// HandleVAA sends a VAA to subscribers that filters apply the conditions.
func (s *AllVaaSubscribers) HandleVAA(vaaBytes []byte) error {
s.source <- vaaBytes
return nil
}
func (s *AllVaaSubscribers) Start(ctx context.Context) {
defer func() {
for _, subscriberByID := range s.subscribers {
if subscriberByID != nil {
close(subscriberByID.ch)
}
}
}()
for {
select {
case <-ctx.Done():
return
case newSubscriber := <-s.addSubscriber:
s.subscribers[newSubscriber.id] = newSubscriber
s.logger.Info("New subscriber registered in all VAAs", zap.String("id", newSubscriber.id))
case subscriberToRemove := <-s.removeSubscriber:
if subscriber, exists := s.subscribers[subscriberToRemove.id]; exists {
close(subscriber.ch)
delete(s.subscribers, subscriberToRemove.id)
s.logger.Info("Subscriber unregistered in all VAAs", zap.String("id", subscriber.id))
}
case vaaBytes, ok := <-s.source:
if !ok {
break
}
v, err := vaa.Unmarshal(vaaBytes)
if err != nil {
s.logger.Error("failed unmarshaing VAA bytes from gossipv1.SignedVAAWithQuorum.", zap.Error(err))
return err
continue
}
// resType defines which oneof proto will be retuned - res type "SignedVaa" is *gossipv1.SignedVAAWithQuorum
@ -145,14 +224,14 @@ func (s *AllVaaSubscribers) HandleVAA(vaaBytes []byte) error {
VaaType: resType,
}
s.m.Lock()
defer s.m.Unlock()
// loop through the subscriptions and send responses to everyone that wants this VAA
for _, sub := range s.subscribers {
if len(sub.filters) == 0 {
// this subscription has no filters, send them the VAA.
sub.ch <- envelope
select {
case sub.ch <- envelope:
default:
}
continue
}
@ -166,14 +245,17 @@ func (s *AllVaaSubscribers) HandleVAA(vaaBytes []byte) error {
if v.EmitterChain == filterChain && v.EmitterAddress.String() == filterAddr {
// it is a match, send the response
sub.ch <- envelope
select {
case sub.ch <- envelope:
default:
}
}
default:
s.logger.Error(fmt.Sprintf("unsupported filter type in subscriptions: %T", filter))
s.logger.Error(fmt.Sprintf("Unsupported filter type in subscriptions: %T", filter))
}
}
}
return nil
}
}
}

View File

@ -32,62 +32,65 @@ func createVAA(chainID vaa.ChainID, emitterAddr vaa.Address) *vaa.VAA {
}
func TestSignedVaaSubscribers_Register(t *testing.T) {
logger := zaptest.NewLogger(t)
var fi []filterSignedVaa
svs := NewSignedVaaSubscribers()
id, sub := svs.Register(fi)
assert.NotEmpty(t, id)
svs := NewSignedVaaSubscribers(logger)
sub := svs.Register(fi)
assert.NotNil(t, sub)
assert.NotEmpty(t, sub.id)
}
func TestSignedVaaSubscribers_Unregister(t *testing.T) {
logger := zaptest.NewLogger(t)
var fi []filterSignedVaa
svs := NewSignedVaaSubscribers()
id, _ := svs.Register(fi)
assert.Equal(t, 1, len(svs.subscribers))
svs.Unregister(id)
assert.Equal(t, 0, len(svs.subscribers))
svs := NewSignedVaaSubscribers(logger)
sub := svs.Register(fi)
assert.Equal(t, 1, len(svs.addSubscriber))
svs.Unregister(sub)
assert.Equal(t, 1, len(svs.removeSubscriber))
}
func TestSignedVaaSubscribers_HandleVAA(t *testing.T) {
t.Run("empty filters", func(t *testing.T) {
logger := zaptest.NewLogger(t)
var fi []filterSignedVaa
svs := NewSignedVaaSubscribers()
_, sub := svs.Register(fi)
svs := NewSignedVaaSubscribers(logger)
svs.Register(fi)
vaas := []byte{0x0, 0x1, 0x2, 0x3}
err := svs.HandleVAA(vaas)
assert.Nil(t, err)
msg := <-sub.ch
assert.Equal(t, vaas, msg.vaaBytes)
vaaBytes := <-svs.source
assert.Equal(t, vaas, vaaBytes)
})
t.Run("invalid vaa", func(t *testing.T) {
logger := zaptest.NewLogger(t)
fi := []filterSignedVaa{
{
chainId: 18,
emitterAddr: vaa.Address{0x0, 0x1},
},
}
svs := NewSignedVaaSubscribers()
_, _ = svs.Register(fi)
svs := NewSignedVaaSubscribers(logger)
_ = svs.Register(fi)
vaas := []byte{0x0, 0x1, 0x2, 0x3}
err := svs.HandleVAA(vaas)
assert.NotNil(t, err)
assert.Nil(t, err)
})
t.Run("filter doesn't apply", func(t *testing.T) {
logger := zaptest.NewLogger(t)
fi := []filterSignedVaa{
{
chainId: 18,
emitterAddr: vaa.Address{0x0, 0x1},
},
}
svs := NewSignedVaaSubscribers()
_, sub := svs.Register(fi)
svs := NewSignedVaaSubscribers(logger)
sub := svs.Register(fi)
vaa := createVAA(vaa.ChainIDEthereum, emitterAddr)
vaaBytes, _ := vaa.MarshalBinary()
err := svs.HandleVAA(vaaBytes)
@ -101,9 +104,9 @@ func TestAllVaaSubscribers_Register(t *testing.T) {
logger := zaptest.NewLogger(t)
avs := NewAllVaaSubscribers(logger)
id, sub := avs.Register(fi)
assert.NotEmpty(t, id)
sub := avs.Register(fi)
assert.NotNil(t, sub)
assert.NotEmpty(t, sub.id)
}
func TestAllVaaSubscribers_Unregister(t *testing.T) {
@ -111,40 +114,37 @@ func TestAllVaaSubscribers_Unregister(t *testing.T) {
logger := zaptest.NewLogger(t)
avs := NewAllVaaSubscribers(logger)
id, _ := avs.Register(fi)
sub := avs.Register(fi)
assert.Equal(t, 1, len(avs.subscribers))
avs.Unregister(id)
assert.Equal(t, 0, len(avs.subscribers))
assert.Equal(t, 1, len(avs.addSubscriber))
avs.Unregister(sub)
assert.Equal(t, 1, len(avs.removeSubscriber))
}
func TestAllVaaSubscribers_HandleVAA(t *testing.T) {
t.Run("empty filters", func(t *testing.T) {
var fi []*spyv1.FilterEntry
logger := zaptest.NewLogger(t)
avs := NewAllVaaSubscribers(logger)
_, sub := avs.Register(fi)
emitterAddr := vaa.Address{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4}
vaa := createVAA(vaa.ChainIDEthereum, emitterAddr)
vaaBytes, _ := vaa.MarshalBinary()
err := avs.HandleVAA(vaaBytes)
assert.Nil(t, err)
msg := <-sub.ch
resp := msg.VaaType.(*spyv1.SubscribeSignedVAAByTypeResponse_SignedVaa)
assert.Equal(t, vaaBytes, resp.SignedVaa.Vaa)
msg := <-avs.source
assert.Equal(t, vaaBytes, msg)
})
t.Run("invalid vaa", func(t *testing.T) {
var fi []*spyv1.FilterEntry
logger := zaptest.NewLogger(t)
avs := NewAllVaaSubscribers(logger)
_, _ = avs.Register(fi)
_ = avs.Register(fi)
vaas := []byte{0x0, 0x1, 0x2, 0x3}
err := avs.HandleVAA(vaas)
assert.NotNil(t, err)
assert.Nil(t, err)
})
t.Run("filter doesn't apply", func(t *testing.T) {
@ -160,7 +160,7 @@ func TestAllVaaSubscribers_HandleVAA(t *testing.T) {
}
logger := zaptest.NewLogger(t)
avs := NewAllVaaSubscribers(logger)
_, sub := avs.Register(fi)
sub := avs.Register(fi)
emitterAddr := vaa.Address{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4}
vaa := createVAA(vaa.ChainIDEthereum, emitterAddr)
vaaBytes, _ := vaa.MarshalBinary()