wormhole-explorer/analytic/consumer/consumer.go

66 lines
1.8 KiB
Go

package consumer
import (
"context"
"github.com/wormhole-foundation/wormhole-explorer/analytic/metric"
"github.com/wormhole-foundation/wormhole-explorer/analytic/queue"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
vaa_sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
// Consumer consumer struct definition.
type Consumer struct {
consume queue.VAAConsumeFunc
pushMetric metric.MetricPushFunc
logger *zap.Logger
p2pNetwork string
}
// New creates a new vaa consumer.
func New(consume queue.VAAConsumeFunc, pushMetric metric.MetricPushFunc, logger *zap.Logger, p2pNetwork string) *Consumer {
return &Consumer{consume: consume, pushMetric: pushMetric, logger: logger, p2pNetwork: p2pNetwork}
}
// Start consumes messages from VAA queue, parse and store those messages in a repository.
func (c *Consumer) Start(ctx context.Context) {
go func() {
for msg := range c.consume(ctx) {
event := msg.Data()
// check id message is expired.
if msg.IsExpired() {
c.logger.Warn("Message with vaa expired", zap.String("id", event.ID))
msg.Failed()
continue
}
// unmarshal vaa.
vaa, err := vaa.Unmarshal(event.Vaa)
if err != nil {
c.logger.Error("Invalid vaa", zap.String("id", event.ID), zap.Error(err))
msg.Failed()
continue
}
// filter vaa from pythnet.
if c.p2pNetwork == domain.P2pMainNet && vaa_sdk.ChainIDPythNet == vaa.EmitterChain {
c.logger.Debug("Skip vaa from pythnet", zap.String("id", event.ID))
msg.Done()
continue
}
// push vaa metrics.
err = c.pushMetric(ctx, vaa)
if err != nil {
msg.Failed()
continue
}
msg.Done()
c.logger.Info("Pushed vaa metric", zap.String("id", event.ID))
}
}()
}