130 lines
3.5 KiB
Go
130 lines
3.5 KiB
Go
package processor
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
|
|
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
|
|
"github.com/wormhole-foundation/wormhole-explorer/fly/storage"
|
|
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
|
|
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// VAAQueueConsumer represents a VAA queue consumer.
|
|
type VAAQueueConsumer struct {
|
|
consume VAAQueueConsumeFunc
|
|
repository *storage.Repository
|
|
notifyFunc VAANotifyFunc
|
|
metrics metrics.Metrics
|
|
logger *zap.Logger
|
|
}
|
|
|
|
// NewVAAQueueConsumer creates a new VAA queue consumer instances.
|
|
func NewVAAQueueConsumer(
|
|
consume VAAQueueConsumeFunc,
|
|
repository *storage.Repository,
|
|
notifyFunc VAANotifyFunc,
|
|
metrics metrics.Metrics,
|
|
logger *zap.Logger) *VAAQueueConsumer {
|
|
return &VAAQueueConsumer{
|
|
consume: consume,
|
|
repository: repository,
|
|
notifyFunc: notifyFunc,
|
|
metrics: metrics,
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// Start consumes messages from VAA queue and store those messages in a repository.
|
|
func (c *VAAQueueConsumer) Start(ctx context.Context) {
|
|
go func() {
|
|
for msg := range c.consume(ctx) {
|
|
v, err := sdk.Unmarshal(msg.Data())
|
|
if err != nil {
|
|
c.logger.Error("Error unmarshalling vaa", zap.Error(err))
|
|
msg.Failed()
|
|
continue
|
|
}
|
|
|
|
if msg.IsExpired() {
|
|
c.logger.Warn("Message with vaa expired", zap.String("id", v.MessageID()))
|
|
msg.Failed()
|
|
continue
|
|
}
|
|
|
|
c.metrics.IncVaaConsumedFromQueue(v.EmitterChain)
|
|
|
|
c.metrics.IncConsistencyLevelByChainID(v.EmitterChain, v.ConsistencyLevel)
|
|
|
|
if v.EmitterChain != sdk.ChainIDPythNet && domain.ConsistencyLevelIsImmediately(v) {
|
|
dbVaa, err := c.repository.FindVaaByID(ctx, v.MessageID())
|
|
if err != nil {
|
|
c.logger.Error("Error finding vaa in repository",
|
|
zap.String("id", v.MessageID()),
|
|
zap.Error(err))
|
|
msg.Failed()
|
|
continue
|
|
}
|
|
if dbVaa == nil {
|
|
err = c.repository.UpsertVaa(ctx, v, msg.Data())
|
|
if err != nil {
|
|
c.logger.Error("Error inserting vaa in repository",
|
|
zap.String("id", v.MessageID()),
|
|
zap.Error(err))
|
|
msg.Failed()
|
|
continue
|
|
}
|
|
} else {
|
|
existingVaa, err := sdk.Unmarshal(dbVaa.Vaa)
|
|
if err != nil {
|
|
c.logger.Error("Error unmarshalling found vaa", zap.Error(err), zap.String("id", v.MessageID()))
|
|
msg.Failed()
|
|
continue
|
|
}
|
|
currentHash := v.SigningDigest()
|
|
savedHash := existingVaa.SigningDigest()
|
|
// if the hash is the same, we can skip the vaa
|
|
if currentHash.Hex() == savedHash.Hex() {
|
|
msg.Done(ctx)
|
|
continue
|
|
}
|
|
//put as dirty the vaa and save it in duplicatedVaas
|
|
err = c.repository.UpsertDuplicateVaa(ctx, v, msg.Data())
|
|
if err != nil {
|
|
c.logger.Error("Error inserting duplicate vaa in repository",
|
|
zap.String("id", v.MessageID()),
|
|
zap.Error(err))
|
|
msg.Failed()
|
|
continue
|
|
}
|
|
c.metrics.IncDuplicateVaaByChainID(v.EmitterChain)
|
|
}
|
|
|
|
} else {
|
|
err = c.repository.UpsertVaa(ctx, v, msg.Data())
|
|
if err != nil {
|
|
c.logger.Error("Error inserting vaa in repository",
|
|
zap.String("id", v.MessageID()),
|
|
zap.Error(err))
|
|
msg.Failed()
|
|
continue
|
|
}
|
|
}
|
|
|
|
err = c.notifyFunc(ctx, v, msg.Data())
|
|
if err != nil {
|
|
c.metrics.IncMaxSequenceCacheError(v.EmitterChain)
|
|
c.logger.Error("Error notifying vaa",
|
|
zap.String("id", v.MessageID()),
|
|
zap.Error(err))
|
|
msg.Failed()
|
|
continue
|
|
}
|
|
|
|
msg.Done(ctx)
|
|
c.logger.Info("Vaa saved in repository", zap.String("id", v.MessageID()))
|
|
}
|
|
}()
|
|
}
|