2023-02-28 12:58:26 -08:00
|
|
|
package consumer
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
|
2023-06-01 12:32:00 -07:00
|
|
|
"github.com/wormhole-foundation/wormhole-explorer/analytics/metric"
|
|
|
|
"github.com/wormhole-foundation/wormhole-explorer/analytics/queue"
|
2023-05-29 06:54:09 -07:00
|
|
|
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
|
2023-02-28 12:58:26 -08:00
|
|
|
"go.uber.org/zap"
|
|
|
|
)
|
|
|
|
|
|
|
|
// Consumer consumer struct definition.
|
|
|
|
type Consumer struct {
|
2023-11-27 07:31:35 -08:00
|
|
|
consume queue.ConsumeFunc
|
2023-02-28 12:58:26 -08:00
|
|
|
pushMetric metric.MetricPushFunc
|
|
|
|
logger *zap.Logger
|
2023-04-14 08:11:47 -07:00
|
|
|
p2pNetwork string
|
2023-02-28 12:58:26 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
// New creates a new vaa consumer.
|
2023-11-27 07:31:35 -08:00
|
|
|
func New(consume queue.ConsumeFunc, pushMetric metric.MetricPushFunc, logger *zap.Logger, p2pNetwork string) *Consumer {
|
2023-04-14 08:11:47 -07:00
|
|
|
return &Consumer{consume: consume, pushMetric: pushMetric, logger: logger, p2pNetwork: p2pNetwork}
|
2023-02-28 12:58:26 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
// Start consumes messages from VAA queue, parse and store those messages in a repository.
|
|
|
|
func (c *Consumer) Start(ctx context.Context) {
|
|
|
|
go func() {
|
|
|
|
for msg := range c.consume(ctx) {
|
|
|
|
event := msg.Data()
|
|
|
|
|
|
|
|
// check id message is expired.
|
|
|
|
if msg.IsExpired() {
|
|
|
|
c.logger.Warn("Message with vaa expired", zap.String("id", event.ID))
|
|
|
|
msg.Failed()
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// unmarshal vaa.
|
2023-05-29 06:54:09 -07:00
|
|
|
vaa, err := sdk.Unmarshal(event.Vaa)
|
2023-02-28 12:58:26 -08:00
|
|
|
if err != nil {
|
|
|
|
c.logger.Error("Invalid vaa", zap.String("id", event.ID), zap.Error(err))
|
|
|
|
msg.Failed()
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// push vaa metrics.
|
2023-11-27 07:31:35 -08:00
|
|
|
err = c.pushMetric(ctx, &metric.Params{TrackID: event.TrackID, Vaa: vaa, VaaIsSigned: event.VaaIsSigned})
|
2023-02-28 12:58:26 -08:00
|
|
|
if err != nil {
|
|
|
|
msg.Failed()
|
|
|
|
continue
|
|
|
|
}
|
2023-05-18 07:14:36 -07:00
|
|
|
|
2023-02-28 12:58:26 -08:00
|
|
|
msg.Done()
|
2023-05-18 07:14:36 -07:00
|
|
|
c.logger.Debug("Pushed vaa metric", zap.String("id", event.ID))
|
2023-02-28 12:58:26 -08:00
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|