Handle new type of messages from fly and event-watcher in parser (#648)

This commit is contained in:
ftocal 2023-08-21 15:44:16 -03:00 committed by Fernando Torres
parent d83b01421e
commit d5fe356ca1
6 changed files with 155 additions and 36 deletions

View File

@ -5,6 +5,11 @@ import (
"time"
)
const (
SignedVaaType = "signed-vaa"
PublishedLogMessageType = "published-log-message"
)
type NotificationEvent struct {
TrackID string `json:"trackId"`
Source string `json:"source"`
@ -12,6 +17,19 @@ type NotificationEvent struct {
Payload json.RawMessage `json:"payload"`
}
func NewNotificationEvent[T EventPayload](trackID, source, _type string, payload T) (*NotificationEvent, error) {
p, err := json.Marshal(payload)
if err != nil {
return nil, err
}
return &NotificationEvent{
TrackID: trackID,
Source: source,
Type: _type,
Payload: json.RawMessage(p),
}, nil
}
type EventPayload interface {
SignedVaa | PublishedLogMessage
}
@ -24,22 +42,22 @@ func GetEventPayload[T EventPayload](e *NotificationEvent) (T, error) {
type SignedVaa struct {
ID string `json:"id"`
EmitterChain int `json:"emitterChain"`
EmitterChain uint16 `json:"emitterChain"`
EmitterAddr string `json:"emitterAddr"`
Sequence string `json:"sequence"`
GuardianSetIndex int `json:"guardianSetIndex"`
Sequence uint64 `json:"sequence"`
GuardianSetIndex uint32 `json:"guardianSetIndex"`
Timestamp time.Time `json:"timestamp"`
Vaa string `json:"vaa"`
Vaa []byte `json:"vaa"`
TxHash string `json:"txHash"`
Version int `json:"version"`
}
type PublishedLogMessage struct {
ID string `json:"id"`
EmitterChain int `json:"emitterChain"`
EmitterChain uint16 `json:"emitterChain"`
EmitterAddr string `json:"emitterAddr"`
Sequence string `json:"sequence"`
Timestamp time.Time `json:"timestamp"`
Vaa string `json:"vaa"`
Vaa []byte `json:"vaa"`
TxHash string `json:"txHash"`
}

View File

@ -0,0 +1,57 @@
package domain
import (
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
)
// TestGetEventPayload contains a test harness for the `GetEventPayload` function.
func Test_GetEventPayload(t *testing.T) {
body := `{
"trackId": "63e16082da939a263512a307",
"source": "fly",
"type": "signed-vaa",
"payload": {
"id": "2/000000000000000000000000f890982f9310df57d00f659cf4fd87e65aded8d7/162727",
"emitterChain": 2,
"emitterAddr": "000000000000000000000000f890982f9310df57d00f659cf4fd87e65aded8d7",
"sequence": 162727,
"guardianSetIndex": 0,
"timestamp": "2023-08-04T11:43:48.000Z",
"vaa": "010000000001005defe63f46c192b506758684fada6b97f5a8ee287a82efefa35c59dcf369a83b1abfe5431ad51a31051bf42851b5f699421e525745db03e8bc43a6b36dde6fc00064cd0ea4446900000002000000000000000000000000f890982f9310df57d00f659cf4fd87e65aded8d70000000000027ba7010300000000000000000000000000000000000000000000000000000000004c4b40000000000000000000000000b4fbf271143f4fbf7b91a5ded31805e42b2208d600026d9ae6b2d333c1d65301a59da3eed388ca5dc60cb12496584b75cbe6b15fdbed002000000000000000000000000072b916142650cb48bbbed0acaeb5b287d1c55d917b2262617369635f726563697069656e74223a7b22726563697069656e74223a22633256704d58426f4e445631626a646a4e6a426c6448566d6432317964575272617a4a3061336877647a4e6f595859794e6d4e6d5a6a5933227d7d",
"txHash" : "406065c15b62426c51f987f5923fb376f6b60cb1c15724cc5460a08d18ccc337",
"version" : 1
}
}`
event := NotificationEvent{}
err := json.Unmarshal([]byte(body), &event)
assert.NoError(t, err)
assert.Equal(t, "63e16082da939a263512a307", event.TrackID)
assert.Equal(t, "fly", event.Source)
assert.Equal(t, SignedVaaType, event.Type)
signedVaa, err := GetEventPayload[SignedVaa](&event)
assert.NoError(t, err)
assert.Equal(t, "2/000000000000000000000000f890982f9310df57d00f659cf4fd87e65aded8d7/162727", signedVaa.ID)
}
func Test_GetEventPayload_Error(t *testing.T) {
body := `{
"trackId": "63e16082da939a263512a307",
"source": "fly",
"type": "signed-vaa"
}`
event := NotificationEvent{}
err := json.Unmarshal([]byte(body), &event)
assert.NoError(t, err)
assert.Equal(t, "63e16082da939a263512a307", event.TrackID)
assert.Equal(t, "fly", event.Source)
assert.Equal(t, SignedVaaType, event.Type)
_, err = GetEventPayload[SignedVaa](&event)
assert.Error(t, err)
}

View File

@ -30,7 +30,7 @@ func (c *Consumer) Start(ctx context.Context) {
// check id message is expired.
if msg.IsExpired() {
c.logger.Warn("Message with vaa expired", zap.String("id", event.ID))
c.logger.Warn("Notification event expired", zap.String("id", event.ID))
msg.Failed()
continue
}
@ -38,7 +38,7 @@ func (c *Consumer) Start(ctx context.Context) {
_, err := c.process(ctx, event.Vaa)
if err != nil {
c.logger.Error("Error processing parsed vaa",
c.logger.Error("Error processing notification event",
zap.String("id", event.ID),
zap.Error(err))
msg.Failed()

View File

@ -3,11 +3,11 @@ package queue
import "github.com/wormhole-foundation/wormhole/sdk/vaa"
// PythFilter filter vaa event from pyth chain.
func PythFilter(vaaEvent *VaaEvent) bool {
return vaaEvent.ChainID == uint16(vaa.ChainIDPythNet)
func PythFilter(event *Event) bool {
return event.ChainID == uint16(vaa.ChainIDPythNet)
}
// NonFilter non filter vaa evant.
func NonFilter(vaaEvent *VaaEvent) bool {
func NonFilter(event *Event) bool {
return false
}

View File

@ -10,25 +10,20 @@ type sqsEvent struct {
Message string `json:"Message"`
}
// VaaEvent represents a vaa data to be handle by the pipeline.
type VaaEvent struct {
ID string `json:"id"`
ChainID uint16 `json:"emitterChain"`
EmitterAddress string `json:"emitterAddr"`
Sequence string `json:"sequence"`
GuardianSetIndex uint32 `json:"guardianSetIndex"`
Vaa []byte `json:"vaas"`
IndexedAt time.Time `json:"indexedAt"`
Timestamp *time.Time `json:"timestamp"`
UpdatedAt *time.Time `json:"updatedAt"`
TxHash string `json:"txHash"`
Version uint16 `json:"version"`
Revision uint16 `json:"revision"`
// Event represents a vaa data to be handle by the pipeline.
type Event struct {
ID string
ChainID uint16
EmitterAddress string
Sequence string
Vaa []byte
Timestamp *time.Time
TxHash string
}
// ConsumerMessage defition.
type ConsumerMessage interface {
Data() *VaaEvent
Data() *Event
Done()
Failed()
IsExpired() bool

View File

@ -3,9 +3,11 @@ package queue
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/parser/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/parser/internal/sqs"
"go.uber.org/zap"
@ -26,7 +28,7 @@ type SQS struct {
}
// FilterConsumeFunc filter vaaa func definition.
type FilterConsumeFunc func(vaaEvent *VaaEvent) bool
type FilterConsumeFunc func(vaaEvent *Event) bool
// NewVAASQS creates a VAA queue in SQS instances.
func NewVAASQS(consumer *sqs.Consumer, filterConsume FilterConsumeFunc, metrics metrics.Metrics, logger *zap.Logger, opts ...SQSOption) *SQS {
@ -70,28 +72,34 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
continue
}
// unmarshal message to vaaEvent
var vaaEvent VaaEvent
err = json.Unmarshal([]byte(sqsEvent.Message), &vaaEvent)
// unmarshal message to NotificationEvent
var notification domain.NotificationEvent
err = json.Unmarshal([]byte(sqsEvent.Message), &notification)
if err != nil {
q.logger.Error("Error decoding vaaEvent message from SQSEvent", zap.Error(err))
continue
}
q.metrics.IncVaaConsumedQueue(vaaEvent.ChainID)
event := q.createEvent(&notification)
if event == nil {
continue
}
q.metrics.IncVaaConsumedQueue(event.ChainID)
// filter vaaEvent by p2p net.
if q.filterConsume(&vaaEvent) {
if q.filterConsume(event) {
if err := q.consumer.DeleteMessage(ctx, msg.ReceiptHandle); err != nil {
q.logger.Error("Error deleting message from SQS", zap.Error(err))
}
continue
}
q.metrics.IncVaaUnfiltered(vaaEvent.ChainID)
q.metrics.IncVaaUnfiltered(event.ChainID)
q.wg.Add(1)
q.ch <- &sqsConsumerMessage{
id: msg.ReceiptHandle,
data: &vaaEvent,
data: event,
wg: &q.wg,
logger: q.logger,
consumer: q.consumer,
@ -111,8 +119,49 @@ func (q *SQS) Close() {
close(q.ch)
}
func (q *SQS) createEvent(notification *domain.NotificationEvent) *Event {
if notification.Type != domain.SignedVaaType && notification.Type != domain.PublishedLogMessageType {
q.logger.Debug("Skip event type", zap.String("trackId", notification.TrackID), zap.String("type", notification.Type))
return nil
}
switch notification.Type {
case domain.SignedVaaType:
signedVaa, err := domain.GetEventPayload[domain.SignedVaa](notification)
if err != nil {
q.logger.Error("Error decoding signedVAA from notification event", zap.String("trackId", notification.TrackID), zap.Error(err))
return nil
}
return &Event{
ID: signedVaa.ID,
ChainID: signedVaa.EmitterChain,
EmitterAddress: signedVaa.EmitterAddr,
Sequence: fmt.Sprintf("%d", signedVaa.Sequence),
Vaa: signedVaa.Vaa,
Timestamp: &signedVaa.Timestamp,
TxHash: signedVaa.TxHash,
}
case domain.PublishedLogMessageType:
plm, err := domain.GetEventPayload[domain.PublishedLogMessage](notification)
if err != nil {
q.logger.Error("Error decoding publishedLogMessage from notification event", zap.String("trackId", notification.TrackID), zap.Error(err))
return nil
}
return &Event{
ID: plm.ID,
ChainID: plm.EmitterChain,
EmitterAddress: plm.EmitterAddr,
Sequence: plm.Sequence,
Vaa: plm.Vaa,
Timestamp: &plm.Timestamp,
TxHash: plm.TxHash,
}
}
return nil
}
type sqsConsumerMessage struct {
data *VaaEvent
data *Event
consumer *sqs.Consumer
wg *sync.WaitGroup
id *string
@ -121,7 +170,7 @@ type sqsConsumerMessage struct {
ctx context.Context
}
func (m *sqsConsumerMessage) Data() *VaaEvent {
func (m *sqsConsumerMessage) Data() *Event {
return m.data
}