Handle new type of messages from fly and event-watcher in tx-tracker (#652)

This commit is contained in:
ftocal 2023-08-22 10:04:06 -03:00 committed by Fernando Torres
parent 2939716bd9
commit 38c92c5754
2 changed files with 68 additions and 23 deletions

View File

@ -12,25 +12,20 @@ type sqsEvent struct {
Message string `json:"Message"`
}
// VaaEvent represents a vaa data to be handle by the pipeline.
type VaaEvent struct {
ID string `json:"id"`
ChainID sdk.ChainID `json:"emitterChain"`
EmitterAddress string `json:"emitterAddr"`
Sequence string `json:"sequence"`
GuardianSetIndex uint32 `json:"guardianSetIndex"`
Vaa []byte `json:"vaas"`
IndexedAt time.Time `json:"indexedAt"`
Timestamp *time.Time `json:"timestamp"`
UpdatedAt *time.Time `json:"updatedAt"`
TxHash string `json:"txHash"`
Version uint16 `json:"version"`
Revision uint16 `json:"revision"`
// Event represents a vaa data to be handle by the pipeline.
type Event struct {
ID string
ChainID sdk.ChainID
EmitterAddress string
Sequence string
Vaa []byte
Timestamp *time.Time
TxHash string
}
// ConsumerMessage defition.
type ConsumerMessage interface {
Data() *VaaEvent
Data() *Event
Done()
Failed()
IsExpired() bool

View File

@ -3,13 +3,16 @@ package queue
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"go.uber.org/zap"
sqs_client "github.com/wormhole-foundation/wormhole-explorer/common/client/sqs"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
)
// SQSOption represents a VAA queue in SQS option function.
@ -26,7 +29,7 @@ type SQS struct {
}
// FilterConsumeFunc filter vaaa func definition.
type FilterConsumeFunc func(vaaEvent *VaaEvent) bool
type FilterConsumeFunc func(vaaEvent *Event) bool
// NewVaaSqs creates a VAA queue in SQS instances.
func NewVaaSqs(consumer *sqs_client.Consumer, metrics metrics.Metrics, logger *zap.Logger, opts ...SQSOption) *SQS {
@ -68,19 +71,25 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
continue
}
// unmarshal message to vaaEvent
var vaaEvent VaaEvent
err = json.Unmarshal([]byte(sqsEvent.Message), &vaaEvent)
// unmarshal message to NotificationEvent
var notification domain.NotificationEvent
err = json.Unmarshal([]byte(sqsEvent.Message), &notification)
if err != nil {
q.logger.Error("Error decoding vaaEvent message from SQSEvent", zap.Error(err))
continue
}
q.metrics.IncVaaConsumedQueue(uint16(vaaEvent.ChainID))
event := q.createEvent(&notification)
if event == nil {
continue
}
q.metrics.IncVaaConsumedQueue(uint16(event.ChainID))
q.wg.Add(1)
q.ch <- &sqsConsumerMessage{
id: msg.ReceiptHandle,
data: &vaaEvent,
data: event,
wg: &q.wg,
logger: q.logger,
consumer: q.consumer,
@ -100,8 +109,49 @@ func (q *SQS) Close() {
close(q.ch)
}
func (q *SQS) createEvent(notification *domain.NotificationEvent) *Event {
if notification.Type != domain.SignedVaaType && notification.Type != domain.PublishedLogMessageType {
q.logger.Debug("Skip event type", zap.String("trackId", notification.TrackID), zap.String("type", notification.Type))
return nil
}
switch notification.Type {
case domain.SignedVaaType:
signedVaa, err := domain.GetEventPayload[domain.SignedVaa](notification)
if err != nil {
q.logger.Error("Error decoding signedVAA from notification event", zap.String("trackId", notification.TrackID), zap.Error(err))
return nil
}
return &Event{
ID: signedVaa.ID,
ChainID: sdk.ChainID(signedVaa.EmitterChain),
EmitterAddress: signedVaa.EmitterAddr,
Sequence: fmt.Sprintf("%d", signedVaa.Sequence),
Vaa: signedVaa.Vaa,
Timestamp: &signedVaa.Timestamp,
TxHash: signedVaa.TxHash,
}
case domain.PublishedLogMessageType:
plm, err := domain.GetEventPayload[domain.PublishedLogMessage](notification)
if err != nil {
q.logger.Error("Error decoding publishedLogMessage from notification event", zap.String("trackId", notification.TrackID), zap.Error(err))
return nil
}
return &Event{
ID: plm.ID,
ChainID: sdk.ChainID(plm.EmitterChain),
EmitterAddress: plm.EmitterAddr,
Sequence: plm.Sequence,
Vaa: plm.Vaa,
Timestamp: &plm.Timestamp,
TxHash: plm.TxHash,
}
}
return nil
}
type sqsConsumerMessage struct {
data *VaaEvent
data *Event
consumer *sqs_client.Consumer
wg *sync.WaitGroup
id *string
@ -110,7 +160,7 @@ type sqsConsumerMessage struct {
ctx context.Context
}
func (m *sqsConsumerMessage) Data() *VaaEvent {
func (m *sqsConsumerMessage) Data() *Event {
return m.data
}