diff --git a/tx-tracker/queue/queue.go b/tx-tracker/queue/queue.go index 2b6990b8..c0f26adb 100644 --- a/tx-tracker/queue/queue.go +++ b/tx-tracker/queue/queue.go @@ -12,25 +12,20 @@ type sqsEvent struct { Message string `json:"Message"` } -// VaaEvent represents a vaa data to be handle by the pipeline. -type VaaEvent struct { - ID string `json:"id"` - ChainID sdk.ChainID `json:"emitterChain"` - EmitterAddress string `json:"emitterAddr"` - Sequence string `json:"sequence"` - GuardianSetIndex uint32 `json:"guardianSetIndex"` - Vaa []byte `json:"vaas"` - IndexedAt time.Time `json:"indexedAt"` - Timestamp *time.Time `json:"timestamp"` - UpdatedAt *time.Time `json:"updatedAt"` - TxHash string `json:"txHash"` - Version uint16 `json:"version"` - Revision uint16 `json:"revision"` +// Event represents a vaa data to be handle by the pipeline. +type Event struct { + ID string + ChainID sdk.ChainID + EmitterAddress string + Sequence string + Vaa []byte + Timestamp *time.Time + TxHash string } // ConsumerMessage defition. type ConsumerMessage interface { - Data() *VaaEvent + Data() *Event Done() Failed() IsExpired() bool diff --git a/tx-tracker/queue/vaa_sqs.go b/tx-tracker/queue/vaa_sqs.go index 69de874c..3fc146e4 100644 --- a/tx-tracker/queue/vaa_sqs.go +++ b/tx-tracker/queue/vaa_sqs.go @@ -3,13 +3,16 @@ package queue import ( "context" "encoding/json" + "fmt" "sync" "time" "go.uber.org/zap" sqs_client "github.com/wormhole-foundation/wormhole-explorer/common/client/sqs" + "github.com/wormhole-foundation/wormhole-explorer/common/domain" "github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics" + sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" ) // SQSOption represents a VAA queue in SQS option function. @@ -26,7 +29,7 @@ type SQS struct { } // FilterConsumeFunc filter vaaa func definition. -type FilterConsumeFunc func(vaaEvent *VaaEvent) bool +type FilterConsumeFunc func(vaaEvent *Event) bool // NewVaaSqs creates a VAA queue in SQS instances. func NewVaaSqs(consumer *sqs_client.Consumer, metrics metrics.Metrics, logger *zap.Logger, opts ...SQSOption) *SQS { @@ -68,19 +71,25 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage { continue } - // unmarshal message to vaaEvent - var vaaEvent VaaEvent - err = json.Unmarshal([]byte(sqsEvent.Message), &vaaEvent) + // unmarshal message to NotificationEvent + var notification domain.NotificationEvent + err = json.Unmarshal([]byte(sqsEvent.Message), ¬ification) if err != nil { q.logger.Error("Error decoding vaaEvent message from SQSEvent", zap.Error(err)) continue } - q.metrics.IncVaaConsumedQueue(uint16(vaaEvent.ChainID)) + + event := q.createEvent(¬ification) + if event == nil { + continue + } + + q.metrics.IncVaaConsumedQueue(uint16(event.ChainID)) q.wg.Add(1) q.ch <- &sqsConsumerMessage{ id: msg.ReceiptHandle, - data: &vaaEvent, + data: event, wg: &q.wg, logger: q.logger, consumer: q.consumer, @@ -100,8 +109,49 @@ func (q *SQS) Close() { close(q.ch) } +func (q *SQS) createEvent(notification *domain.NotificationEvent) *Event { + if notification.Type != domain.SignedVaaType && notification.Type != domain.PublishedLogMessageType { + q.logger.Debug("Skip event type", zap.String("trackId", notification.TrackID), zap.String("type", notification.Type)) + return nil + } + + switch notification.Type { + case domain.SignedVaaType: + signedVaa, err := domain.GetEventPayload[domain.SignedVaa](notification) + if err != nil { + q.logger.Error("Error decoding signedVAA from notification event", zap.String("trackId", notification.TrackID), zap.Error(err)) + return nil + } + return &Event{ + ID: signedVaa.ID, + ChainID: sdk.ChainID(signedVaa.EmitterChain), + EmitterAddress: signedVaa.EmitterAddr, + Sequence: fmt.Sprintf("%d", signedVaa.Sequence), + Vaa: signedVaa.Vaa, + Timestamp: &signedVaa.Timestamp, + TxHash: signedVaa.TxHash, + } + case domain.PublishedLogMessageType: + plm, err := domain.GetEventPayload[domain.PublishedLogMessage](notification) + if err != nil { + q.logger.Error("Error decoding publishedLogMessage from notification event", zap.String("trackId", notification.TrackID), zap.Error(err)) + return nil + } + return &Event{ + ID: plm.ID, + ChainID: sdk.ChainID(plm.EmitterChain), + EmitterAddress: plm.EmitterAddr, + Sequence: plm.Sequence, + Vaa: plm.Vaa, + Timestamp: &plm.Timestamp, + TxHash: plm.TxHash, + } + } + return nil +} + type sqsConsumerMessage struct { - data *VaaEvent + data *Event consumer *sqs_client.Consumer wg *sync.WaitGroup id *string @@ -110,7 +160,7 @@ type sqsConsumerMessage struct { ctx context.Context } -func (m *sqsConsumerMessage) Data() *VaaEvent { +func (m *sqsConsumerMessage) Data() *Event { return m.data }