2023-02-16 05:55:54 -08:00
|
|
|
package consumer
|
2023-01-18 07:42:14 -08:00
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
|
2023-07-03 11:46:47 -07:00
|
|
|
"github.com/wormhole-foundation/wormhole-explorer/parser/internal/metrics"
|
2023-03-07 11:25:42 -08:00
|
|
|
"github.com/wormhole-foundation/wormhole-explorer/parser/processor"
|
2023-01-18 07:42:14 -08:00
|
|
|
"github.com/wormhole-foundation/wormhole-explorer/parser/queue"
|
2024-03-25 11:21:10 -07:00
|
|
|
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
|
2023-01-18 07:42:14 -08:00
|
|
|
"go.uber.org/zap"
|
|
|
|
)
|
|
|
|
|
2023-02-16 05:55:54 -08:00
|
|
|
// Consumer consumer struct definition.
|
2023-01-18 07:42:14 -08:00
|
|
|
type Consumer struct {
|
2023-11-27 07:31:35 -08:00
|
|
|
consume queue.ConsumeFunc
|
2023-03-07 11:25:42 -08:00
|
|
|
process processor.ProcessorFunc
|
2023-07-03 11:46:47 -07:00
|
|
|
metrics metrics.Metrics
|
2023-03-07 11:25:42 -08:00
|
|
|
logger *zap.Logger
|
2023-01-18 07:42:14 -08:00
|
|
|
}
|
|
|
|
|
2023-02-16 05:55:54 -08:00
|
|
|
// New creates a new vaa consumer.
|
2023-11-27 07:31:35 -08:00
|
|
|
func New(consume queue.ConsumeFunc, process processor.ProcessorFunc, metrics metrics.Metrics, logger *zap.Logger) *Consumer {
|
2023-07-03 11:46:47 -07:00
|
|
|
return &Consumer{consume: consume, process: process, metrics: metrics, logger: logger}
|
2023-01-18 07:42:14 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
// Start consumes messages from VAA queue, parse and store those messages in a repository.
|
|
|
|
func (c *Consumer) Start(ctx context.Context) {
|
|
|
|
go func() {
|
2023-02-02 09:42:23 -08:00
|
|
|
for msg := range c.consume(ctx) {
|
|
|
|
event := msg.Data()
|
2023-01-18 07:42:14 -08:00
|
|
|
|
2024-03-25 11:21:10 -07:00
|
|
|
emitterChainID := sdk.ChainID(event.ChainID).String()
|
|
|
|
|
2023-02-02 09:42:23 -08:00
|
|
|
// check id message is expired.
|
|
|
|
if msg.IsExpired() {
|
2024-03-25 11:21:10 -07:00
|
|
|
c.metrics.IncExpiredMessage(emitterChainID, event.Source)
|
2023-11-27 07:31:35 -08:00
|
|
|
c.logger.Warn("Event expired", zap.String("id", event.ID))
|
2023-02-16 05:55:54 -08:00
|
|
|
msg.Failed()
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2023-11-27 07:31:35 -08:00
|
|
|
params := &processor.Params{
|
|
|
|
TrackID: event.TrackID,
|
|
|
|
Vaa: event.Vaa,
|
|
|
|
}
|
|
|
|
_, err := c.process(ctx, params)
|
2023-02-02 09:42:23 -08:00
|
|
|
if err != nil {
|
2024-03-25 11:21:10 -07:00
|
|
|
c.metrics.IncUnprocessedMessage(emitterChainID, event.Source)
|
2023-11-27 07:31:35 -08:00
|
|
|
c.logger.Error("Error processing event",
|
|
|
|
zap.String("trackId", event.TrackID),
|
2023-02-16 05:55:54 -08:00
|
|
|
zap.String("id", event.ID),
|
2023-02-02 09:42:23 -08:00
|
|
|
zap.Error(err))
|
|
|
|
msg.Failed()
|
|
|
|
continue
|
2023-11-27 07:31:35 -08:00
|
|
|
} else {
|
2024-03-25 11:21:10 -07:00
|
|
|
c.metrics.IncProcessedMessage(emitterChainID, event.Source)
|
2023-11-27 07:31:35 -08:00
|
|
|
c.logger.Debug("Event processed",
|
|
|
|
zap.String("trackId", event.TrackID),
|
|
|
|
zap.String("id", event.ID))
|
2023-01-18 07:42:14 -08:00
|
|
|
}
|
2023-02-02 09:42:23 -08:00
|
|
|
msg.Done()
|
2023-01-18 07:42:14 -08:00
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|