90 lines
2.6 KiB
Go
90 lines
2.6 KiB
Go
package consumer
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"time"
|
|
|
|
"github.com/wormhole-foundation/wormhole-explorer/parser/parser"
|
|
"github.com/wormhole-foundation/wormhole-explorer/parser/processor"
|
|
"github.com/wormhole-foundation/wormhole-explorer/parser/queue"
|
|
"github.com/wormhole-foundation/wormhole/sdk/vaa"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// Consumer consumer struct definition.
|
|
type Consumer struct {
|
|
consume queue.VAAConsumeFunc
|
|
process processor.ProcessorFunc
|
|
parser parser.ParserVAAAPIClient
|
|
logger *zap.Logger
|
|
}
|
|
|
|
// New creates a new vaa consumer.
|
|
func New(consume queue.VAAConsumeFunc, process processor.ProcessorFunc, parser parser.ParserVAAAPIClient, logger *zap.Logger) *Consumer {
|
|
return &Consumer{consume: consume, process: process, parser: parser, logger: logger}
|
|
}
|
|
|
|
// Start consumes messages from VAA queue, parse and store those messages in a repository.
|
|
func (c *Consumer) Start(ctx context.Context) {
|
|
go func() {
|
|
for msg := range c.consume(ctx) {
|
|
event := msg.Data()
|
|
|
|
// check id message is expired.
|
|
if msg.IsExpired() {
|
|
c.logger.Warn("Message with vaa expired", zap.String("id", event.ID))
|
|
msg.Failed()
|
|
continue
|
|
}
|
|
|
|
// unmarshal vaa.
|
|
vaa, err := vaa.Unmarshal(event.Vaa)
|
|
if err != nil {
|
|
c.logger.Error("Invalid vaa", zap.String("id", event.ID), zap.Error(err))
|
|
msg.Failed()
|
|
continue
|
|
}
|
|
|
|
// call vaa-payload-parser api to parse a VAA.
|
|
vaaParseResponse, err := c.parser.Parse(event.ChainID, event.EmitterAddress, event.Sequence, vaa.Payload)
|
|
if err != nil {
|
|
if errors.Is(err, parser.ErrInternalError) {
|
|
c.logger.Info("error parsing VAA, will retry later", zap.Uint16("chainID", event.ChainID),
|
|
zap.String("address", event.EmitterAddress), zap.String("sequence", event.Sequence), zap.Error(err))
|
|
msg.Failed()
|
|
continue
|
|
}
|
|
|
|
c.logger.Info("VAA cannot be parsed", zap.Uint16("chainID", event.ChainID),
|
|
zap.String("address", event.EmitterAddress), zap.String("sequence", event.Sequence), zap.Error(err))
|
|
msg.Done()
|
|
continue
|
|
}
|
|
|
|
// create ParsedVaaUpdate to upsert.
|
|
now := time.Now()
|
|
vaaParsed := parser.ParsedVaaUpdate{
|
|
ID: event.ID,
|
|
EmitterChain: event.ChainID,
|
|
EmitterAddr: event.EmitterAddress,
|
|
Sequence: event.Sequence,
|
|
AppID: vaaParseResponse.AppID,
|
|
Result: vaaParseResponse.Result,
|
|
Timestamp: vaa.Timestamp,
|
|
UpdatedAt: &now,
|
|
}
|
|
|
|
err = c.process(ctx, &vaaParsed)
|
|
if err != nil {
|
|
c.logger.Error("Error processing parsed vaa",
|
|
zap.String("id", event.ID),
|
|
zap.Error(err))
|
|
msg.Failed()
|
|
continue
|
|
}
|
|
msg.Done()
|
|
}
|
|
}()
|
|
}
|