Add handler for redeem events from blockchain-watcher (#986)

This commit is contained in:
ftocal 2024-01-15 10:22:05 -03:00 committed by GitHub
parent fbf5809241
commit e21b836455
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 485 additions and 75 deletions

View File

@ -17,7 +17,7 @@ require (
github.com/shopspring/decimal v1.3.1
github.com/spf13/cobra v1.7.0
github.com/wormhole-foundation/wormhole-explorer/common v0.0.0-00010101000000-000000000000
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240110201643-e70f2153117e
go.mongodb.org/mongo-driver v1.11.2
go.uber.org/zap v1.24.0
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22

View File

@ -363,8 +363,8 @@ github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPU
github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8=
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8 h1:rrOyHd+H9a6Op1iUyZNCaI5v9D1syq8jDAYyX/2Q4L8=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8/go.mod h1:dE12DOucCq23gjGGGhtbyx41FBxuHxjpPvG+ArO+8t0=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240110201643-e70f2153117e h1:1Y+QtZiWzpOQiIV0YihRK44LE064qMREnsSijzmNZEw=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240110201643-e70f2153117e/go.mod h1:pE/jYet19kY4P3V6mE2+01zvEfxdyBqv6L6HsnSa5uc=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.1 h1:VOMT+81stJgXW3CpHyqHN3AXDYIMsx56mEFrB37Mb/E=

View File

@ -59,7 +59,7 @@ func NewNotificationEvent(log *zap.Logger) ConverterFunc {
return nil, err
}
if notification.Event != events.SignedVaaType && notification.Event != events.LogMessagePublishedMesageType {
if notification.Event != events.SignedVaaType && notification.Event != events.LogMessagePublishedType {
log.Debug("Skip event type", zap.String("trackId", notification.TrackID), zap.String("type", notification.Event))
return nil, nil
}
@ -81,7 +81,7 @@ func NewNotificationEvent(log *zap.Logger) ConverterFunc {
Timestamp: &signedVaa.Timestamp,
VaaIsSigned: false,
}, nil
case events.LogMessagePublishedMesageType:
case events.LogMessagePublishedType:
plm, err := events.GetEventData[events.LogMessagePublished](&notification)
if err != nil {
log.Error("Error decoding publishedLogMessage from notification event", zap.String("trackId", notification.TrackID), zap.Error(err))

View File

@ -74,6 +74,10 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
continue
}
if event == nil {
continue
}
q.wg.Add(1)
q.ch <- &sqsConsumerMessage{
id: msg.ReceiptHandle,

View File

@ -6,8 +6,10 @@ import (
)
const (
SignedVaaType = "signed-vaa"
LogMessagePublishedMesageType = "log-message-published"
SignedVaaType = "signed-vaa"
LogMessagePublishedType = "log-message-published"
EvmTransactionFoundType = "evm-transaction-found"
EvmTransferRedeemedName = "transfer-redeemed"
)
type NotificationEvent struct {
@ -35,7 +37,7 @@ func NewNotificationEvent[T EventData](trackID, source, _type string, data T) (*
}
type EventData interface {
SignedVaa | LogMessagePublished
SignedVaa | LogMessagePublished | EvmTransactionFound
}
func GetEventData[T EventData](e *NotificationEvent) (T, error) {
@ -62,13 +64,33 @@ type LogMessagePublished struct {
TxHash string `json:"txHash"`
BlockHeight string `json:"blockHeight"`
BlockTime time.Time `json:"blockTime"`
Attributes PublishedLogMessageAttributes `json:"attributes"`
Attributes LogMessagePublishedAttributes `json:"attributes"`
}
type PublishedLogMessageAttributes struct {
type LogMessagePublishedAttributes struct {
Sender string `json:"sender"`
Sequence uint64 `json:"sequence"`
Nonce uint32 `json:"nonce"`
Payload string `json:"payload"`
ConsistencyLevel uint8 `json:"consistencyLevel"`
}
type EvmTransactionFound struct {
ChainID int `json:"chainId"`
Emitter string `json:"emitter"`
TxHash string `json:"txHash"`
BlockHeight string `json:"blockHeight"`
BlockTime time.Time `json:"blockTime"`
Attributes EvmTransactionFoundAttributes `json:"attributes"`
}
type EvmTransactionFoundAttributes struct {
Name string `json:"name"`
EmitterChain int `json:"emitterChain"`
EmitterAddress string `json:"emitterAddress"`
Sequence uint64 `json:"sequence"`
Method string `json:"methodsByAddress"`
From string `json:"from"`
To string `json:"to"`
Status string `json:"status"`
}

View File

@ -8,7 +8,7 @@ import (
)
// TestGetEventPayload contains a test harness for the `GetEventPayload` function.
func Test_GetEventPayload(t *testing.T) {
func Test_GetSignedVaaEventPayload(t *testing.T) {
body := `{
"trackId": "63e16082da939a263512a307",
@ -55,3 +55,64 @@ func Test_GetEventPayload_Error(t *testing.T) {
_, err = GetEventData[SignedVaa](&event)
assert.Error(t, err)
}
func Test_GetEvmTransactionFoundPayload(t *testing.T) {
body := `{
"trackId": "chain-event-0xe210617eb9fdf2e970a3c9a1bc5e4ca2509066f40f0a7a663d3bcae0d60e72a9-18971205",
"source": "blockchain-watcher",
"event": "evm-transaction-found",
"timestamp": "2024-01-11T13:59:35.082Z",
"version": "1",
"data": {
"chainId": 2,
"emitter": "0x4cb69fae7e7af841e44e1a1c30af640739378bb2",
"txHash": "0xe210617eb9fdf2e970a3c9a1bc5e4ca2509066f40f0a7a663d3bcae0d60e72a9",
"blockHeight": "18971205",
"blockTime": "2024-01-09T18:31:23.000Z",
"attributes": {
"name": "transfer-redeemed",
"from": "0x8afbb9925104c39463d3b502335e3514ec92553e",
"to": "0x4cb69fae7e7af841e44e1a1c30af640739378bb2",
"status": "completed",
"blockNumber": "0x1217a45",
"input": "0x0a55d7350000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000006000000000000000000000000000000000000000000000000000000000000005200000000000000000000000000000000000000000000000000000000000000640000000000000000000000000000000000000000000000000000000000000048701000000030d010e33dc45352e9983cb3a827c5ced0570f909a61e94d6336e4a55c9aa76aa74096ecdeec6e2a686491ae8051c3e804e10686cadb5f6bb88f0a92932507a6c2d6d000219df93c68219df38d6ea5e5fe72910fa1c511bb6332ca1d93bc4fd5aeb043f2c0e616f2e42b8e08ebdfc02928a0f13d70967b200a7af80ad1cffc80934b6a6e601033c6e1450d094fecd575e16536f9c30df997597401b2ad352931cb3bb598766f144fbdefb5ed141e1608e76b1c103bd95d476de60bf5562c5f1cdbc6d154ed329000541b5a745447e7467c46af651c192ebd3823740cd95c32f61de10865777b0f3310cdad20ca89d83d24651c28c9371aa16a1159affe4fd3c9d3c0b2f44565c3f360106c9413d59e5accda81ee95ac3fa88487b350528da94e57103c4ce48e71895cfd5748f005ff52b05e6aef746d56d14b8fb1df737341f75723cc7979b24d196271b01078db25338a7d16038a0da53e5803bcc3d1d23bd85e98fba2a94315aa6595f74fc7bff4238864878ff6c5c19a43ea8878d2a92afa0523a7b0113f2279bc78700bb010a74b542994b5af8d4193c0b7e3b8485a14d7f5b9ad86aa288ed830be6e654661753a6a6172e9645349485df5e7c9649f0c85ef79c448a0fff8e8c4efedfe05864000b9ea722831c60712d72471754678ffbd15b990dd984b2d6bdc3a5f935d7a2cb7416a8f00398204d47a569e2889fae1c7e3a5148b4787713b7012f296ec583514a000c057a930f767bea9a4e1d53a67286fc597d85fe9d3948f9f5982ddd9cb0b38c3f30cc8f54548b659bd74ade4916d40c27e846a79377386a4b3c0ef0646b293a29010d7aaee47c8e8ec4f2f977c8f4023dd3f7e4cd765749382a0a4f162b01fcff8ae7033036c1c99b9407fc940c4a7f345eb75707cad0e399fe46085b076c24c0005e000f44d47ce1be2287aa1a61eb47a540d0a7f2f8c45869bf115a15171d8ecf264ffc32623e9ce9848b8643f84fab8b19421734a7ca911b625d3c96a20e542daa77740110485bac142b1aead1fc29de9e4544b48fe78824e3f9e39e2776e941d3e81977a10529554c64d34cf21dcf03e0047f7613eea0531756d0b63f93c6ea997e56bd5f0012869d9aae8de982b4df0f42c31afb5edea5201a15a38990b39df55a54a0d81d9e6287d1000892a3e6a4c2f1b0fe9c252771472beae8556adde579a7e5499dfc4900659d8bfc0000000000170000000000000000000000002703483b1a5a7c577e8680de9df8be03c6f30e3c000000000000250f0101000000000000000000000000af88d065e77c8cc2239327c5edb3a432268e583100000000000000000000000000000000000000000000000000000000e87547000000000300000000000000000000a9080000000000000000000000004cb69fae7e7af841e44e1a1c30af640739378bb20000000000000000000000004cb69fae7e7af841e44e1a1c30af640739378bb20061010000000000000000000000000000000000000000000000000000000002faf080000000000000000000000000000000000000000000000000000000000000000000000000000000000000000030eeff183dce51bd0738e931f6d7b73e232388680000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000f8000000000000000300000000000000000000a90800000000000000000000000019330d10d9cc8751218eaf51e8885d058642e08a000000000000000000000000bd3fa81b58ba92a82136038b25adec7066af3155000000000000000000000000aada05bd399372f0b0463744c09113c137636f6a00000000000000000000000000000000af88d065e77c8cc2239327c5edb3a432268e58310000000000000000000000004cb69fae7e7af841e44e1a1c30af640739378bb200000000000000000000000000000000000000000000000000000000e87547000000000000000000000000002703483b1a5a7c577e8680de9df8be03c6f30e3c0000000000000000000000000000000000000000000000000000000000000000000000000000008205e0644ae6febfdd7c92c56070c85acc73a33d9f9e26258e7e39bd407a451af4530004bb163c223e53cb20e1d3e52320982d6f19ca49ff5fefeb1c696a15a9271b5dd3cae4915aa6792037a8aa4b378f4e6e62be358ee8560879d47cbaa823f6df0d3dda6973cfef81781fd5bf8731af4e65a826481914ff07dc0398f61403e3fb1c000000000000000000000000000000000000000000000000000000000000",
"methodsByAddress": "MethodRedeemTokensCCTP",
"timestamp": 1704825083,
"blockHash": "0x144f135187a3a38be44f95d5476683b8de1b32de370f9a79ea3ffbfa1d503d60",
"gas": "0x6ac2d",
"gasPrice": "0x58590ac18",
"maxFeePerGas": "0xac6f82c10",
"maxPriorityFeePerGas": "0x59682f00",
"nonce": "0x74",
"r": "0xdf1c2ece8c0c89b496c86b63b19cadf99241350f592838c0bef5d85cd7a334fe",
"s": "0x18e146e167052b0f3d0c5f2dce6db226775bf6671bb801f4573b98635eef7499",
"transactionIndex": "0x44",
"type": "0x2",
"v": "0x0",
"value": "0x0",
"sequence": 9487,
"emitterAddress": "0000000000000000000000002703483B1A5A7C577E8680DE9DF8BE03C6F30E3C",
"emitterChain": 23
}
}
}
`
event := NotificationEvent{}
err := json.Unmarshal([]byte(body), &event)
assert.NoError(t, err)
assert.Equal(t, "chain-event-0xe210617eb9fdf2e970a3c9a1bc5e4ca2509066f40f0a7a663d3bcae0d60e72a9-18971205", event.TrackID)
assert.Equal(t, "blockchain-watcher", event.Source)
assert.Equal(t, EvmTransactionFoundType, event.Event)
etf, err := GetEventData[EvmTransactionFound](&event)
assert.NoError(t, err)
assert.Equal(t, "0xe210617eb9fdf2e970a3c9a1bc5e4ca2509066f40f0a7a663d3bcae0d60e72a9", etf.TxHash)
assert.Equal(t, 23, etf.Attributes.EmitterChain)
assert.Equal(t, "0000000000000000000000002703483B1A5A7C577E8680DE9DF8BE03C6F30E3C", etf.Attributes.EmitterAddress)
assert.Equal(t, uint64(9487), etf.Attributes.Sequence)
assert.Equal(t, "0x4cb69fae7e7af841e44e1a1c30af640739378bb2", etf.Attributes.To)
assert.Equal(t, "0x8afbb9925104c39463d3b502335e3514ec92553e", etf.Attributes.From)
assert.Equal(t, "MethodRedeemTokensCCTP", etf.Attributes.Method)
assert.Equal(t, "completed", etf.Attributes.Status)
assert.Equal(t, "transfer-redeemed", etf.Attributes.Name)
}

View File

@ -18,7 +18,7 @@ require (
github.com/shopspring/decimal v1.3.1
github.com/stretchr/testify v1.8.0
github.com/test-go/testify v1.1.4
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240110201643-e70f2153117e
go.mongodb.org/mongo-driver v1.11.2
go.uber.org/zap v1.24.0
)

View File

@ -413,8 +413,8 @@ github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPU
github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8=
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8 h1:rrOyHd+H9a6Op1iUyZNCaI5v9D1syq8jDAYyX/2Q4L8=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8/go.mod h1:dE12DOucCq23gjGGGhtbyx41FBxuHxjpPvG+ArO+8t0=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240110201643-e70f2153117e h1:1Y+QtZiWzpOQiIV0YihRK44LE064qMREnsSijzmNZEw=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240110201643-e70f2153117e/go.mod h1:pE/jYet19kY4P3V6mE2+01zvEfxdyBqv6L6HsnSa5uc=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.1 h1:VOMT+81stJgXW3CpHyqHN3AXDYIMsx56mEFrB37Mb/E=

View File

@ -0,0 +1,15 @@
package repository
import (
"time"
)
type IndexingTimestamps struct {
IndexedAt time.Time `bson:"indexedAt"`
}
func IndexedAt(t time.Time) IndexingTimestamps {
return IndexingTimestamps{
IndexedAt: t,
}
}

View File

@ -19,10 +19,6 @@ type DestinationTx struct {
UpdatedAt *time.Time `bson:"updatedAt"`
}
type IndexingTimestamps struct {
IndexedAt time.Time `bson:"indexedAt"`
}
// TransactionUpdate represents a transaction document.
type TransactionUpdate struct {
ID string `bson:"_id"`

View File

@ -6,6 +6,7 @@ import (
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
"github.com/wormhole-foundation/wormhole-explorer/common/repository"
cwAlert "github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/alert"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/metrics"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
@ -41,16 +42,10 @@ func NewRepository(db *mongo.Database, metrics metrics.Metrics, alerts alert.Ale
}}
}
func indexedAt(t time.Time) IndexingTimestamps {
return IndexingTimestamps{
IndexedAt: t,
}
}
func (s *Repository) UpsertGlobalTransaction(ctx context.Context, chainID sdk.ChainID, globalTx TransactionUpdate) error {
update := bson.M{
"$set": globalTx,
"$setOnInsert": indexedAt(time.Now()),
"$setOnInsert": repository.IndexedAt(time.Now()),
"$inc": bson.D{{Key: "revision", Value: 1}},
}
@ -86,7 +81,7 @@ func (s *Repository) GetGlobalTransactionByID(ctx context.Context, id string) (T
func (s *Repository) UpdateWatcherBlock(ctx context.Context, chainID sdk.ChainID, watcherBlock WatcherBlock) error {
update := bson.M{
"$set": watcherBlock,
"$setOnInsert": indexedAt(time.Now()),
"$setOnInsert": repository.IndexedAt(time.Now()),
}
s.metrics.SetCurrentBlock(chainID, uint64(watcherBlock.BlockNumber))
_, err := s.collections.watcherBlock.UpdateByID(ctx, watcherBlock.ID, update, options.Update().SetUpsert(true))

View File

@ -8,7 +8,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/sethvargo/go-envconfig v0.6.0 // Configuration environment
github.com/stretchr/testify v1.8.1 // Testing
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240110201643-e70f2153117e
go.mongodb.org/mongo-driver v1.11.2
go.uber.org/zap v1.24.0
)

View File

@ -379,8 +379,8 @@ github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPU
github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8=
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8 h1:rrOyHd+H9a6Op1iUyZNCaI5v9D1syq8jDAYyX/2Q4L8=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8/go.mod h1:dE12DOucCq23gjGGGhtbyx41FBxuHxjpPvG+ArO+8t0=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240110201643-e70f2153117e h1:1Y+QtZiWzpOQiIV0YihRK44LE064qMREnsSijzmNZEw=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240110201643-e70f2153117e/go.mod h1:pE/jYet19kY4P3V6mE2+01zvEfxdyBqv6L6HsnSa5uc=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.1 h1:VOMT+81stJgXW3CpHyqHN3AXDYIMsx56mEFrB37Mb/E=

View File

@ -59,7 +59,7 @@ func NewNotificationEvent(log *zap.Logger) ConverterFunc {
return nil, err
}
if notification.Event != events.SignedVaaType && notification.Event != events.LogMessagePublishedMesageType {
if notification.Event != events.SignedVaaType && notification.Event != events.LogMessagePublishedType {
log.Debug("Skip event type", zap.String("trackId", notification.TrackID), zap.String("type", notification.Event))
return nil, nil
}
@ -82,7 +82,7 @@ func NewNotificationEvent(log *zap.Logger) ConverterFunc {
Timestamp: &signedVaaEvent.Timestamp,
TxHash: signedVaaEvent.TxHash,
}, nil
case events.LogMessagePublishedMesageType:
case events.LogMessagePublishedType:
plm, err := events.GetEventData[events.LogMessagePublished](&notification)
if err != nil {
log.Error("Error decoding publishedLogMessage from notification event", zap.String("trackId", notification.TrackID), zap.Error(err))

View File

@ -1,23 +0,0 @@
# syntax=docker.io/docker/dockerfile:1.3@sha256:42399d4635eddd7a9b8a24be879d2f9a930d0ed040a61324cfdf59ef1357b3b2
FROM --platform=linux/amd64 docker.io/golang:1.19.2@sha256:0467d7d12d170ed8d998a2dae4a09aa13d0aa56e6d23c4ec2b1e4faacf86a813 AS build
WORKDIR /app
COPY tx-tracker tx-tracker
COPY common common
COPY api api
# Build the Go app
RUN cd tx-tracker && CGO_ENABLED=0 GOOS=linux go build -o "./tx-tracker-backfiller" cmd/backfiller/main.go
############################
# STEP 2 build a small image
############################
FROM alpine
#Copy certificates
COPY --from=build /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
# Copy our static executable.
COPY --from=build "/app/tx-tracker/tx-tracker-backfiller" "/tx-tracker-backfiller"
# Run the binary.
ENTRYPOINT ["/tx-tracker-backfiller"]

View File

@ -517,7 +517,7 @@ const jsonTxSearchResponse = `
}
`
func TestXxx1(t *testing.T) {
func TestParseSeiResponse(t *testing.T) {
result, err := parseTxSearchResponse[seiTx]([]byte(jsonTxSearchResponse), &cosmosTxSearchParams{}, seiTxSearchExtractor)
assert.NoError(t, err)
assert.NotNil(t, result)

View File

@ -57,11 +57,18 @@ func (c *Consumer) producerLoop(ctx context.Context) {
for msg := range ch {
c.logger.Debug("Received message", zap.String("vaaId", msg.Data().ID), zap.String("trackId", msg.Data().TrackID))
c.process(ctx, msg)
switch msg.Data().Type {
case queue.SourceChainEvent:
c.processSourceTx(ctx, msg)
case queue.TargetChainEvent:
c.processTargetTx(ctx, msg)
default:
c.logger.Error("Unknown message type", zap.String("trackId", msg.Data().TrackID), zap.Any("type", msg.Data().Type))
}
}
}
func (c *Consumer) process(ctx context.Context, msg queue.ConsumerMessage) {
func (c *Consumer) processSourceTx(ctx context.Context, msg queue.ConsumerMessage) {
event := msg.Data()
@ -125,7 +132,7 @@ func (c *Consumer) process(ctx context.Context, msg queue.ConsumerMessage) {
)
} else {
msg.Done()
c.logger.Info("Transaction processed successfully",
c.logger.Info("Origin transaction processed successfully",
zap.String("trackId", event.TrackID),
zap.String("id", event.ID),
elapsedLog,
@ -133,3 +140,50 @@ func (c *Consumer) process(ctx context.Context, msg queue.ConsumerMessage) {
c.metrics.IncOriginTxInserted(uint16(event.ChainID))
}
}
func (c *Consumer) processTargetTx(ctx context.Context, msg queue.ConsumerMessage) {
event := msg.Data()
attr, ok := queue.GetAttributes[*queue.TargetChainAttributes](event)
if !ok || attr == nil {
msg.Failed()
c.logger.Error("Failed to get attributes from message", zap.String("trackId", event.TrackID), zap.String("vaaId", event.ID))
return
}
start := time.Now()
// Process the VAA
p := ProcessTargetTxParams{
TrackID: event.TrackID,
VaaId: event.ID,
ChainId: event.ChainID,
Emitter: event.EmitterAddress,
TxHash: event.TxHash,
BlockTimestamp: event.Timestamp,
BlockHeight: attr.BlockHeight,
Method: attr.Method,
From: attr.From,
To: attr.To,
Status: attr.Status,
}
err := ProcessTargetTx(ctx, c.logger, c.repository, &p)
elapsedLog := zap.Uint64("elapsedTime", uint64(time.Since(start).Milliseconds()))
if err != nil {
msg.Failed()
c.logger.Error("Failed to process destinationTx",
zap.String("trackId", event.TrackID),
zap.String("vaaId", event.ID),
zap.Error(err),
elapsedLog,
)
} else {
msg.Done()
c.logger.Info("Destination transaction processed successfully",
zap.String("trackId", event.TrackID),
zap.String("id", event.ID),
elapsedLog,
)
}
}

View File

@ -8,6 +8,7 @@ import (
"github.com/pkg/errors"
"github.com/wormhole-foundation/wormhole-explorer/api/handlers/vaa"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/common/repository"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/chains"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.mongodb.org/mongo-driver/bson"
@ -17,6 +18,25 @@ import (
"go.uber.org/zap"
)
// DestinationTx representa a destination transaction.
type DestinationTx struct {
ChainID sdk.ChainID `bson:"chainId"`
Status string `bson:"status"`
Method string `bson:"method"`
TxHash string `bson:"txHash"`
From string `bson:"from"`
To string `bson:"to"`
BlockNumber string `bson:"blockNumber"`
Timestamp *time.Time `bson:"timestamp"`
UpdatedAt *time.Time `bson:"updatedAt"`
}
// TargetTxUpdate represents a transaction document.
type TargetTxUpdate struct {
ID string `bson:"_id"`
Destination *DestinationTx `bson:"destinationTx"`
}
// Repository exposes operations over the `globalTransactions` collection.
type Repository struct {
logger *zap.Logger
@ -38,8 +58,8 @@ func NewRepository(logger *zap.Logger, db *mongo.Database) *Repository {
return &r
}
// UpsertDocumentParams is a struct that contains the parameters for the upsertDocument method.
type UpsertDocumentParams struct {
// UpsertOriginTxParams is a struct that contains the parameters for the upsertDocument method.
type UpsertOriginTxParams struct {
VaaId string
ChainId sdk.ChainID
TxDetail *chains.TxDetail
@ -47,7 +67,7 @@ type UpsertDocumentParams struct {
Timestamp *time.Time
}
func (r *Repository) UpsertDocument(ctx context.Context, params *UpsertDocumentParams) error {
func (r *Repository) UpsertOriginTx(ctx context.Context, params *UpsertOriginTxParams) error {
fields := bson.D{
{Key: "status", Value: params.TxStatus},
@ -381,3 +401,39 @@ func (r *Repository) GetVaaIdTxHash(ctx context.Context, id string) (*VaaIdTxHas
err := r.vaaIdTxHash.FindOne(ctx, bson.M{"_id": id}).Decode(&v)
return &v, err
}
func (r *Repository) UpsertTargetTx(ctx context.Context, globalTx *TargetTxUpdate) error {
update := bson.M{
"$set": globalTx,
"$setOnInsert": repository.IndexedAt(time.Now()),
"$inc": bson.D{{Key: "revision", Value: 1}},
}
_, err := r.globalTransactions.UpdateByID(ctx, globalTx.ID, update, options.Update().SetUpsert(true))
if err != nil {
r.logger.Error("Error inserting target tx in global transaction", zap.Error(err))
return err
}
return err
}
// AlreadyProcessed returns true if the given VAA ID has already been processed.
func (r *Repository) GetTargetTx(ctx context.Context, vaaId string) (*TargetTxUpdate, error) {
result := r.
globalTransactions.
FindOne(ctx, bson.D{
{Key: "_id", Value: vaaId},
{Key: "destinationTx", Value: bson.D{{Key: "$exists", Value: true}}},
})
var tx TargetTxUpdate
err := result.Decode(&tx)
if err == mongo.ErrNoDocuments {
return nil, nil
} else if err != nil {
return nil, fmt.Errorf("failed to decode already processed VAA id: %w", err)
} else {
return &tx, nil
}
}

View File

@ -15,12 +15,6 @@ import (
var ErrAlreadyProcessed = errors.New("VAA was already processed")
const (
minRetries = 3
retryDelay = 1 * time.Minute
retryDeadline = 10 * time.Minute
)
// ProcessSourceTxParams is a struct that contains the parameters for the ProcessSourceTx method.
type ProcessSourceTxParams struct {
TrackID string
@ -112,7 +106,7 @@ func ProcessSourceTx(
}
// Store source transaction details in the database
p := UpsertDocumentParams{
p := UpsertOriginTxParams{
VaaId: params.VaaId,
ChainId: params.ChainId,
Timestamp: params.Timestamp,
@ -120,7 +114,7 @@ func ProcessSourceTx(
TxStatus: domain.SourceTxStatusConfirmed,
}
err = repository.UpsertDocument(ctx, &p)
err = repository.UpsertOriginTx(ctx, &p)
if err != nil {
return nil, err
}

View File

@ -0,0 +1,95 @@
package consumer
import (
"context"
"errors"
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
var (
errTxFailedCannotBeUpdated = errors.New("tx with status failed can not be updated because exists a confirmed tx for the same vaa ID")
errTxUnknowCannotBeUpdated = errors.New("tx with status unknown can not be updated because exists a tx (confirmed|failed) for the same vaa ID")
errInvalidTxStatus = errors.New("invalid tx status")
)
// ProcessSourceTxParams is a struct that contains the parameters for the ProcessSourceTx method.
type ProcessTargetTxParams struct {
TrackID string
VaaId string
ChainId sdk.ChainID
Emitter string
TxHash string
BlockTimestamp *time.Time
BlockHeight string
Method string
From string
To string
Status string
}
func ProcessTargetTx(
ctx context.Context,
logger *zap.Logger,
repository *Repository,
params *ProcessTargetTxParams,
) error {
now := time.Now()
update := &TargetTxUpdate{
ID: params.VaaId,
Destination: &DestinationTx{
ChainID: params.ChainId,
Status: params.Status,
TxHash: params.TxHash,
BlockNumber: params.BlockHeight,
Timestamp: params.BlockTimestamp,
From: params.From,
To: params.To,
Method: params.Method,
UpdatedAt: &now,
},
}
// check if the transaction should be updated.
shoudBeUpdated, err := checkTxShouldBeUpdated(ctx, update, repository)
if !shoudBeUpdated {
logger.Warn("Transaction should not be updated", zap.String("vaaId", params.VaaId), zap.Error(err))
return nil
}
return repository.UpsertTargetTx(ctx, update)
}
func checkTxShouldBeUpdated(ctx context.Context, tx *TargetTxUpdate, repository *Repository) (bool, error) {
switch tx.Destination.Status {
case domain.DstTxStatusConfirmed:
return true, nil
case domain.DstTxStatusFailedToProcess:
// check if the transaction exists from the same vaa ID.
oldTx, err := repository.GetTargetTx(ctx, tx.ID)
if err != nil {
return true, nil
}
// if the transaction was already confirmed, then no update it.
if oldTx.Destination.Status == domain.DstTxStatusConfirmed {
return false, errTxFailedCannotBeUpdated
}
return true, nil
case domain.DstTxStatusUnkonwn:
// check if the transaction exists from the same vaa ID.
oldTx, err := repository.GetTargetTx(ctx, tx.ID)
if err != nil {
return true, nil
}
// if the transaction was already confirmed or failed to process, then no update it.
if oldTx.Destination.Status == domain.DstTxStatusConfirmed || oldTx.Destination.Status == domain.DstTxStatusFailedToProcess {
return false, errTxUnknowCannotBeUpdated
}
return true, nil
default:
return false, errInvalidTxStatus
}
}

View File

@ -125,7 +125,7 @@ require (
github.com/tklauser/go-sysconf v0.3.5 // indirect
github.com/tklauser/numcpus v0.2.2 // indirect
github.com/wormhole-foundation/wormhole-explorer/common v0.0.0-20230301134427-b3ec0bcc9eda
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240110201643-e70f2153117e
go.uber.org/zap v1.24.0
golang.org/x/crypto v0.7.0 // indirect
golang.org/x/sys v0.10.0 // indirect

View File

@ -577,8 +577,8 @@ github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVS
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
github.com/wormhole-foundation/wormhole-explorer/api v0.0.0-20230316184234-db3a54270a77 h1:jkNKRDIq7pWP/jBU4KxxArScHFpBqBCXeMakgN7W3zg=
github.com/wormhole-foundation/wormhole-explorer/api v0.0.0-20230316184234-db3a54270a77/go.mod h1:SX//TXHIIc+QMHco3wxphC5Jk6wO/lk3r9J+SZnYip8=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8 h1:rrOyHd+H9a6Op1iUyZNCaI5v9D1syq8jDAYyX/2Q4L8=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8/go.mod h1:dE12DOucCq23gjGGGhtbyx41FBxuHxjpPvG+ArO+8t0=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240110201643-e70f2153117e h1:1Y+QtZiWzpOQiIV0YihRK44LE064qMREnsSijzmNZEw=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240110201643-e70f2153117e/go.mod h1:pE/jYet19kY4P3V6mE2+01zvEfxdyBqv6L6HsnSa5uc=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.1 h1:VOMT+81stJgXW3CpHyqHN3AXDYIMsx56mEFrB37Mb/E=

View File

@ -6,6 +6,7 @@ import (
"github.com/gofiber/fiber/v2"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/config"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/consumer"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
@ -17,11 +18,18 @@ type Controller struct {
repository *consumer.Repository
rpcProviderSettings *config.RpcProviderSettings
p2pNetwork string
metrics metrics.Metrics
}
// NewController creates a Controller instance.
func NewController(vaaRepository *Repository, repository *consumer.Repository, rpcProviderSettings *config.RpcProviderSettings, p2pNetwork string, logger *zap.Logger) *Controller {
return &Controller{vaaRepository: vaaRepository, repository: repository, rpcProviderSettings: rpcProviderSettings, p2pNetwork: p2pNetwork, logger: logger}
return &Controller{
metrics: metrics.NewDummyMetrics(),
vaaRepository: vaaRepository,
repository: repository,
rpcProviderSettings: rpcProviderSettings,
p2pNetwork: p2pNetwork,
logger: logger}
}
func (c *Controller) Process(ctx *fiber.Ctx) error {
@ -52,6 +60,7 @@ func (c *Controller) Process(ctx *fiber.Ctx) error {
Emitter: vaa.EmitterAddress.String(),
Sequence: strconv.FormatUint(vaa.Sequence, 10),
TxHash: v.TxHash,
Metrics: c.metrics,
Overwrite: true,
}

View File

@ -39,6 +39,7 @@ func NewVaaConverter(log *zap.Logger) ConverterFunc {
}
return &Event{
TrackID: fmt.Sprintf("pipeline-%s", vaaEvent.ID),
Type: SourceChainEvent,
ID: vaaEvent.ID,
ChainID: vaaEvent.ChainID,
EmitterAddress: vaaEvent.EmitterAddress,
@ -59,7 +60,12 @@ func NewNotificationEvent(log *zap.Logger) ConverterFunc {
return nil, err
}
if notification.Event != events.SignedVaaType && notification.Event != events.LogMessagePublishedMesageType {
switch notification.Event {
case events.SignedVaaType,
events.LogMessagePublishedType,
events.EvmTransactionFoundType:
//message is valid
default:
log.Debug("Skip event type", zap.String("trackId", notification.TrackID), zap.String("type", notification.Event))
return nil, nil
}
@ -74,6 +80,7 @@ func NewNotificationEvent(log *zap.Logger) ConverterFunc {
return &Event{
TrackID: notification.TrackID,
Type: SourceChainEvent,
ID: signedVaa.ID,
ChainID: sdk.ChainID(signedVaa.EmitterChain),
EmitterAddress: signedVaa.EmitterAddress,
@ -81,7 +88,8 @@ func NewNotificationEvent(log *zap.Logger) ConverterFunc {
Timestamp: &signedVaa.Timestamp,
TxHash: signedVaa.TxHash,
}, nil
case events.LogMessagePublishedMesageType:
case events.LogMessagePublishedType:
plm, err := events.GetEventData[events.LogMessagePublished](&notification)
if err != nil {
log.Error("Error decoding publishedLogMessage from notification event", zap.String("trackId", notification.TrackID), zap.Error(err))
@ -96,6 +104,7 @@ func NewNotificationEvent(log *zap.Logger) ConverterFunc {
return &Event{
TrackID: notification.TrackID,
Type: SourceChainEvent,
ID: vaa.MessageID(),
ChainID: sdk.ChainID(plm.ChainID),
EmitterAddress: plm.Attributes.Sender,
@ -103,6 +112,47 @@ func NewNotificationEvent(log *zap.Logger) ConverterFunc {
Timestamp: &plm.BlockTime,
TxHash: plm.TxHash,
}, nil
case events.EvmTransactionFoundType:
tr, err := events.GetEventData[events.EvmTransactionFound](&notification)
if err != nil {
log.Error("Error decoding transferRedeemed from notification event", zap.String("trackId", notification.TrackID), zap.Error(err))
return nil, nil
}
address, err := sdk.StringToAddress(tr.Attributes.EmitterAddress)
if err != nil {
return nil, fmt.Errorf("error converting emitter address: %w", err)
}
vaa := sdk.VAA{
EmitterChain: sdk.ChainID(tr.Attributes.EmitterChain),
EmitterAddress: address,
Sequence: tr.Attributes.Sequence,
}
if tr.Attributes.Name != events.EvmTransferRedeemedName {
log.Warn("Skip event because it is not transfer-redeemed ", zap.String("trackId", notification.TrackID), zap.String("name", tr.Attributes.Name))
return nil, nil
}
return &Event{
TrackID: notification.TrackID,
Type: TargetChainEvent,
ID: vaa.MessageID(),
ChainID: sdk.ChainID(tr.ChainID),
EmitterAddress: tr.Attributes.EmitterAddress,
Sequence: strconv.FormatUint(tr.Attributes.Sequence, 10),
Timestamp: &tr.BlockTime,
TxHash: tr.TxHash,
Attributes: &TargetChainAttributes{
Emitter: tr.Emitter,
BlockHeight: tr.BlockHeight,
TxHash: tr.TxHash,
From: tr.Attributes.From,
To: tr.Attributes.To,
Method: tr.Attributes.Method,
Status: tr.Attributes.Status,
},
}, nil
}
return nil, nil
}

View File

@ -79,6 +79,9 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
q.logger.Error("Error decoding vaaEvent message from SQSEvent", zap.Error(err))
continue
}
if event == nil {
continue
}
q.metrics.IncVaaConsumedQueue(uint16(event.ChainID))
q.wg.Add(1)

View File

@ -12,15 +12,50 @@ type sqsEvent struct {
Message string `json:"Message"`
}
type EventType string
const (
SourceChainEvent EventType = "source-chain-event"
TargetChainEvent EventType = "target-chain-event"
)
type SourceChainAttributes struct {
}
type TargetChainAttributes struct {
Emitter string
BlockHeight string
ChainID sdk.ChainID
Status string
Method string
TxHash string
From string
To string
}
type EventAttributes interface {
*SourceChainAttributes | *TargetChainAttributes
}
// Event represents a event data to be handle.
type Event struct {
TrackID string
Type EventType
ID string
ChainID sdk.ChainID
EmitterAddress string
Sequence string
Timestamp *time.Time
TxHash string
Attributes any
}
func GetAttributes[T EventAttributes](e *Event) (T, bool) {
_, ok := interface{}(e.Attributes).(T)
if ok {
return e.Attributes.(T), ok
}
return nil, ok
}
// ConsumerMessage defition.

View File

@ -0,0 +1,44 @@
package queue
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestGetSourceChainAttributes(t *testing.T) {
e := &Event{
TrackID: "pipeline-1",
Type: SourceChainEvent,
ID: "2/0000000000000000000000003ee18b2214aff97000d974cf647e7c347e8fa585/107429",
ChainID: 2,
EmitterAddress: "0000000000000000000000003ee18b2214aff97000d974cf647e7c347e8fa585",
Sequence: "107429",
Timestamp: nil,
TxHash: "0x7837b71e9d83b4fbff385ed4af3f70e13b820c2ba6580494bc1a205f3cd8e88c",
Attributes: &SourceChainAttributes{},
}
_, ok := GetAttributes[*SourceChainAttributes](e)
assert.True(t, ok)
}
func TestGetTargetChainAttributes(t *testing.T) {
e := &Event{
TrackID: "chain-event-1",
Type: TargetChainEvent,
ID: "2/0000000000000000000000003ee18b2214aff97000d974cf647e7c347e8fa585/107429",
ChainID: 2,
EmitterAddress: "9NQ5FxpSHLKBt4tHZwqp5zXKq2yFMTqtGR5iQX1BH1Ay",
Sequence: "377892",
Timestamp: nil,
TxHash: "YQ3XZQ33Uu2TV78Ms6zPtznxK5aWK3zJAbmawi46rtb126cNpnJ9B3CfK5EjTJUoYKkJp8QbTRiEsBkxD8nzDD9",
Attributes: &TargetChainAttributes{
Emitter: "9NQ5FxpSHLKBt4tHZwqp5zXKq2yFMTqtGR5iQX1BH1Ay",
BlockHeight: "183675392",
},
}
attr, ok := GetAttributes[*TargetChainAttributes](e)
assert.True(t, ok)
assert.NotNil(t, attr)
assert.Equal(t, "9NQ5FxpSHLKBt4tHZwqp5zXKq2yFMTqtGR5iQX1BH1Ay", attr.Emitter)
}