wormhole-explorer/spy/grpc/handler.go

112 lines
3.3 KiB
Go

package grpc
import (
"fmt"
spyv1 "github.com/certusone/wormhole/node/pkg/proto/spy/v1"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// Handler represents a GRPC subscription service handler.
type Handler struct {
spyv1.UnimplementedSpyRPCServiceServer
svs *SignedVaaSubscribers
avs *AllVaaSubscribers
logger *zap.Logger
}
// NewHandler creates a new handler of suscriptions.
func NewHandler(svs *SignedVaaSubscribers, avs *AllVaaSubscribers, logger *zap.Logger) *Handler {
return &Handler{
svs: svs,
avs: avs,
logger: logger,
}
}
// SubscribeSignedVAA implements the suscriptions of signed VAA.
func (h *Handler) SubscribeSignedVAA(req *spyv1.SubscribeSignedVAARequest, resp spyv1.SpyRPCService_SubscribeSignedVAAServer) error {
h.logger.Info("Receiving new subscriber in signed VAA")
var fi []filterSignedVaa
if req.Filters != nil {
for _, f := range req.Filters {
switch t := f.Filter.(type) {
case *spyv1.FilterEntry_EmitterFilter:
addr, err := vaa.StringToAddress(t.EmitterFilter.EmitterAddress)
if err != nil {
h.logger.Error("Decoding emitter address", zap.Error(err))
return status.Error(codes.InvalidArgument, fmt.Sprintf("failed to decode emitter address: %v", err))
}
fi = append(fi, filterSignedVaa{
chainId: vaa.ChainID(t.EmitterFilter.ChainId),
emitterAddr: addr,
})
default:
h.logger.Error("Unsupported filter type", zap.Any("filter", t))
return status.Error(codes.InvalidArgument, "unsupported filter type")
}
}
}
subscriber := h.svs.Register(fi)
defer h.svs.Unregister(subscriber)
for {
select {
case <-resp.Context().Done():
h.logger.Error("Context done", zap.String("id", subscriber.id), zap.Error(resp.Context().Err()))
return resp.Context().Err()
case msg := <-subscriber.ch:
if err := resp.Send(&spyv1.SubscribeSignedVAAResponse{
VaaBytes: msg.vaaBytes,
}); err != nil {
h.logger.Error("Sending vaas", zap.String("id", subscriber.id), zap.Error(err))
return err
}
}
}
}
// SubscribeSignedVAAByType implements the suscriptions of signed VAA by type.
func (h *Handler) SubscribeSignedVAAByType(req *spyv1.SubscribeSignedVAAByTypeRequest, resp spyv1.SpyRPCService_SubscribeSignedVAAByTypeServer) error {
h.logger.Info("Receiving new subscriber in signed VAA by type")
var fi []*spyv1.FilterEntry
if req.Filters != nil {
for _, f := range req.Filters {
switch t := f.Filter.(type) {
case *spyv1.FilterEntry_EmitterFilter:
// validate the emitter address is valid by decoding it
_, err := vaa.StringToAddress(t.EmitterFilter.EmitterAddress)
if err != nil {
return status.Error(codes.InvalidArgument, fmt.Sprintf("failed to decode emitter address: %v", err))
}
fi = append(fi, &spyv1.FilterEntry{Filter: t})
case *spyv1.FilterEntry_BatchFilter,
*spyv1.FilterEntry_BatchTransactionFilter:
fi = append(fi, &spyv1.FilterEntry{Filter: t})
default:
return status.Error(codes.InvalidArgument, "unsupported filter type")
}
}
}
sub := h.avs.Register(fi)
defer h.avs.Unregister(sub)
for {
select {
case <-resp.Context().Done():
return resp.Context().Err()
case msg := <-sub.ch:
if err := resp.Send(msg); err != nil {
return err
}
}
}
}