diff --git a/pipeline/cmd/main.go b/pipeline/cmd/main.go index cc7fd405..8c0b614e 100644 --- a/pipeline/cmd/main.go +++ b/pipeline/cmd/main.go @@ -69,8 +69,13 @@ func main() { // create a new pipeline repository. repository := pipeline.NewRepository(db.Database, logger) + // create and start a new tx hash handler. + quit := make(chan bool) + txHashHandler := pipeline.NewTxHashHandler(repository, pushFunc, logger, quit) + go txHashHandler.Run(rootCtx) + // create a new publisher. - publisher := pipeline.NewPublisher(pushFunc, repository, config.P2pNetwork, logger) + publisher := pipeline.NewPublisher(pushFunc, repository, config.P2pNetwork, txHashHandler, logger) watcher := watcher.NewWatcher(rootCtx, db.Database, config.MongoDatabase, publisher.Publish, logger) err = watcher.Start(rootCtx) if err != nil { @@ -95,11 +100,15 @@ func main() { logger.Info("root context cancelled, exiting...") rootCtxCancel() + logger.Info("Closing tx hash handler ...") + close(quit) + logger.Info("Closing database connections ...") db.Close() logger.Info("Closing Http server ...") server.Stop() logger.Info("Finished wormhole-explorer-pipeline") + } func newAwsConfig(appCtx context.Context, cfg *config.Configuration) (aws.Config, error) { diff --git a/pipeline/go.mod b/pipeline/go.mod index db7ee90c..f86c5123 100644 --- a/pipeline/go.mod +++ b/pipeline/go.mod @@ -16,6 +16,8 @@ require ( github.com/aws/aws-sdk-go-v2/config v1.1.1 github.com/aws/aws-sdk-go-v2/credentials v1.1.1 github.com/aws/aws-sdk-go-v2/service/sns v1.20.2 + github.com/golang/mock v1.6.0 + github.com/test-go/testify v1.1.4 github.com/wormhole-foundation/wormhole-explorer/common v0.0.0-00010101000000-000000000000 github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230413131841-4f1feb38999a ) @@ -32,6 +34,7 @@ require ( github.com/aws/smithy-go v1.13.5 // indirect github.com/benbjohnson/clock v1.3.0 // indirect github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect github.com/ethereum/go-ethereum v1.10.21 // indirect github.com/golang/snappy v0.0.4 // indirect @@ -43,6 +46,7 @@ require ( github.com/mattn/go-isatty v0.0.16 // indirect github.com/mattn/go-runewidth v0.0.14 // indirect github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/stretchr/testify v1.8.1 // indirect github.com/tidwall/pretty v1.2.1 // indirect @@ -60,6 +64,7 @@ require ( golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.2.0 // indirect golang.org/x/text v0.4.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) // Needed for cosmos-sdk based chains. See diff --git a/pipeline/go.sum b/pipeline/go.sum index c86f06d7..f38f9c9c 100644 --- a/pipeline/go.sum +++ b/pipeline/go.sum @@ -44,6 +44,8 @@ github.com/ethereum/go-ethereum v1.10.21 h1:5lqsEx92ZaZzRyOqBEXux4/UR06m296RGzN3 github.com/ethereum/go-ethereum v1.10.21/go.mod h1:EYFyF19u3ezGLD4RqOkLq+ZCXzYbLoNDdZlMt7kyKFg= github.com/gofiber/fiber/v2 v2.40.1 h1:pc7n9VVpGIqNsvg9IPLQhyFEMJL8gCs1kneH5D1pIl4= github.com/gofiber/fiber/v2 v2.40.1/go.mod h1:Gko04sLksnHbzLSRBFWPFdzM9Ws9pRxvvIaohJK1dsk= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -98,6 +100,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/test-go/testify v1.1.4 h1:Tf9lntrKUMHiXQ07qBScBTSA0dhYQlu83hswqelv1iE= +github.com/test-go/testify v1.1.4/go.mod h1:rH7cfJo/47vWGdi4GPj16x3/t1xGOj2YxzmNQzk2ghU= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= @@ -169,6 +173,7 @@ golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -176,6 +181,7 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pipeline/pipeline/mocks/repository.go b/pipeline/pipeline/mocks/repository.go new file mode 100644 index 00000000..37d0fa5e --- /dev/null +++ b/pipeline/pipeline/mocks/repository.go @@ -0,0 +1,65 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: repository.go + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + pipeline "github.com/wormhole-foundation/wormhole-explorer/pipeline/pipeline" +) + +// MockIRepository is a mock of IRepository interface. +type MockIRepository struct { + ctrl *gomock.Controller + recorder *MockIRepositoryMockRecorder +} + +// MockIRepositoryMockRecorder is the mock recorder for MockIRepository. +type MockIRepositoryMockRecorder struct { + mock *MockIRepository +} + +// NewMockIRepository creates a new mock instance. +func NewMockIRepository(ctrl *gomock.Controller) *MockIRepository { + mock := &MockIRepository{ctrl: ctrl} + mock.recorder = &MockIRepositoryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockIRepository) EXPECT() *MockIRepositoryMockRecorder { + return m.recorder +} + +// GetVaaIdTxHash mocks base method. +func (m *MockIRepository) GetVaaIdTxHash(ctx context.Context, id string) (*pipeline.VaaIdTxHash, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetVaaIdTxHash", ctx, id) + ret0, _ := ret[0].(*pipeline.VaaIdTxHash) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetVaaIdTxHash indicates an expected call of GetVaaIdTxHash. +func (mr *MockIRepositoryMockRecorder) GetVaaIdTxHash(ctx, id interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetVaaIdTxHash", reflect.TypeOf((*MockIRepository)(nil).GetVaaIdTxHash), ctx, id) +} + +// UpdateVaaDocTxHash mocks base method. +func (m *MockIRepository) UpdateVaaDocTxHash(ctx context.Context, id, txhash string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateVaaDocTxHash", ctx, id, txhash) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateVaaDocTxHash indicates an expected call of UpdateVaaDocTxHash. +func (mr *MockIRepositoryMockRecorder) UpdateVaaDocTxHash(ctx, id, txhash interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateVaaDocTxHash", reflect.TypeOf((*MockIRepository)(nil).UpdateVaaDocTxHash), ctx, id, txhash) +} diff --git a/pipeline/pipeline/publisher.go b/pipeline/pipeline/publisher.go index 04c6d169..41122669 100644 --- a/pipeline/pipeline/publisher.go +++ b/pipeline/pipeline/publisher.go @@ -2,8 +2,6 @@ package pipeline import ( "context" - "fmt" - "time" "github.com/wormhole-foundation/wormhole-explorer/common/domain" "github.com/wormhole-foundation/wormhole-explorer/pipeline/topic" @@ -14,19 +12,22 @@ import ( // Publisher definition. type Publisher struct { - logger *zap.Logger - pushFunc topic.PushFunc - repository *Repository - p2pNetwork string + logger *zap.Logger + pushFunc topic.PushFunc + repository *Repository + p2pNetwork string + txHashHandler *TxHashHandler } // NewPublisher creates a new publisher for vaa with parse configuration. -func NewPublisher(pushFunc topic.PushFunc, repository *Repository, p2pNetwork string, logger *zap.Logger) *Publisher { +func NewPublisher(pushFunc topic.PushFunc, repository *Repository, p2pNetwork string, txHashHandler *TxHashHandler, logger *zap.Logger) *Publisher { return &Publisher{ - logger: logger, - repository: repository, - pushFunc: pushFunc, - p2pNetwork: p2pNetwork} + logger: logger, + repository: repository, + pushFunc: pushFunc, + p2pNetwork: p2pNetwork, + txHashHandler: txHashHandler, + } } // Publish sends a Event for the vaa that has parse configuration defined. @@ -56,12 +57,11 @@ func (p *Publisher) Publish(ctx context.Context, e *watcher.Event) { // discard pyth messages isPyth := domain.P2pMainNet == p.p2pNetwork && vaa.ChainIDPythNet == vaa.ChainID(e.ChainID) if !isPyth { - // retry 3 times with 2 seconds delay fixing the vaa with empty txhash. - txHash, err := Retry(p.handleEmptyVaaTxHash, 3, 2*time.Second)(ctx, e.ID) - if err != nil { - p.logger.Error("can not get txhash for vaa", zap.Error(err), zap.String("event", event.ID)) - } - event.TxHash = txHash + // add the event to the txhash handler. + // the handler will try to get the txhash for the vaa + // and publish the event with the txhash. + p.txHashHandler.AddVaaFixItem(event) + return } } @@ -71,40 +71,3 @@ func (p *Publisher) Publish(ctx context.Context, e *watcher.Event) { p.logger.Error("can not push event to topic", zap.Error(err), zap.String("event", event.ID)) } } - -// handleEmptyVaaTxHash tries to get the txhash for the vaa with the given id. -func (p *Publisher) handleEmptyVaaTxHash(ctx context.Context, id string) (string, error) { - vaaIdTxHash, err := p.repository.GetVaaIdTxHash(ctx, id) - if err != nil { - return "", err - } - - if vaaIdTxHash.TxHash == "" { - return "", fmt.Errorf("txhash for vaa (%s) is empty", id) - } - - err = p.repository.UpdateVaaDocTxHash(ctx, id, vaaIdTxHash.TxHash) - if err != nil { - return "", err - } - return vaaIdTxHash.TxHash, nil -} - -// RetryFn is a function that can be retried. -type RetryFn func(ctx context.Context, id string) (string, error) - -// Retry retries a function. -func Retry(retryFn RetryFn, retries int, delay time.Duration) RetryFn { - return func(ctx context.Context, id string) (string, error) { - var err error - var txHash string - for i := 0; i <= retries; i++ { - txHash, err = retryFn(ctx, id) - if err == nil { - return txHash, nil - } - time.Sleep(delay) - } - return txHash, err - } -} diff --git a/pipeline/pipeline/repository.go b/pipeline/pipeline/repository.go index 2c953488..8689cd0f 100644 --- a/pipeline/pipeline/repository.go +++ b/pipeline/pipeline/repository.go @@ -1,3 +1,4 @@ +//go:generate mockgen -source=repository.go -destination=mocks/repository.go -package=mocks package pipeline import ( @@ -10,6 +11,12 @@ import ( "go.uber.org/zap" ) +// Interface +type IRepository interface { + GetVaaIdTxHash(ctx context.Context, id string) (*VaaIdTxHash, error) + UpdateVaaDocTxHash(ctx context.Context, id string, txhash string) error +} + // Repository is the repository data access layer. type Repository struct { db *mongo.Database diff --git a/pipeline/pipeline/tests/txhash_handler_test.go b/pipeline/pipeline/tests/txhash_handler_test.go new file mode 100644 index 00000000..1bf61305 --- /dev/null +++ b/pipeline/pipeline/tests/txhash_handler_test.go @@ -0,0 +1,62 @@ +package tests + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/test-go/testify/assert" + "github.com/test-go/testify/require" + "github.com/wormhole-foundation/wormhole-explorer/pipeline/pipeline" + "github.com/wormhole-foundation/wormhole-explorer/pipeline/pipeline/mocks" + "github.com/wormhole-foundation/wormhole-explorer/pipeline/topic" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" +) + +func TestNewTxHashHandler(t *testing.T) { + + mock := gomock.NewController(t) + defer mock.Finish() + + repo := mocks.NewMockIRepository(mock) + + // log, _ := zap.NewDevelopment() + observedZapCore, observedLogs := observer.New(zap.InfoLevel) + observedLogger := zap.New(observedZapCore) + + quit := make(chan bool) + + var f = topic.PushFunc(func(context.Context, *topic.Event) error { + return nil + }) + + txHashHandler := pipeline.NewTxHashHandler(repo, f, observedLogger, quit) + txHashHandler.AddVaaFixItem(topic.Event{ + ID: "vaa1", + }, + ) + + ctx := context.Background() + + repo.EXPECT().GetVaaIdTxHash(ctx, "vaa1").Return(nil, fmt.Errorf("error")) + repo.EXPECT().GetVaaIdTxHash(ctx, "vaa1").Return(&pipeline.VaaIdTxHash{ + ChainID: 1, + TxHash: "0xbabla", + }, nil) + repo.EXPECT().UpdateVaaDocTxHash(ctx, "vaa1", "0xbabla").Return(nil) + + go txHashHandler.Run(ctx) + time.Sleep(6 * time.Second) + close(quit) + + require.Equal(t, 3, observedLogs.Len()) + allLogs := observedLogs.All() + // first attempt to get txhash should fail + assert.Equal(t, "Error while trying to fix vaa txhash", allLogs[1].Message) + // second attempt to get txhash should succeed + assert.Equal(t, "Vaa txhash fixed", allLogs[2].Message) + +} diff --git a/pipeline/pipeline/txhash_handler.go b/pipeline/pipeline/txhash_handler.go new file mode 100644 index 00000000..ca0b6a91 --- /dev/null +++ b/pipeline/pipeline/txhash_handler.go @@ -0,0 +1,101 @@ +package pipeline + +import ( + "context" + "fmt" + "time" + + "github.com/wormhole-foundation/wormhole-explorer/pipeline/topic" + "go.uber.org/zap" +) + +type ItemTuple struct { + Retries int + Event topic.Event +} + +type TxHashHandler struct { + logger *zap.Logger + repository IRepository + fixItems map[string]ItemTuple + inputQueue chan topic.Event + quit chan bool + sleepTime time.Duration + pushFunc topic.PushFunc + defaultRetries int +} + +func NewTxHashHandler(repository IRepository, pushFunc topic.PushFunc, logger *zap.Logger, quit chan bool) *TxHashHandler { + return &TxHashHandler{ + logger: logger, + repository: repository, + fixItems: map[string]ItemTuple{}, + inputQueue: make(chan topic.Event, 100), + sleepTime: 2 * time.Second, + pushFunc: pushFunc, + defaultRetries: 3, + } +} + +// Add a new element to the fixItems array +func (t *TxHashHandler) AddVaaFixItem(event topic.Event) { + t.inputQueue <- event +} + +func (t *TxHashHandler) Run(ctx context.Context) { + t.logger.Info("TxHashHandler started") + for { + select { + case <-t.quit: + t.logger.Info("stopping txhash handler") + return + case event := <-t.inputQueue: + t.fixItems[event.ID] = ItemTuple{ + Retries: 3, + Event: event, + } + default: + // no lock needed. the map is never updated while iterating. + for vaa, item := range t.fixItems { + if item.Retries > 0 { + txHash, err := t.handleEmptyVaaTxHash(ctx, vaa) + if err != nil { + t.logger.Error("Error while trying to fix vaa txhash", zap.Int("retries_count", item.Retries), zap.Error(err)) + item.Retries = item.Retries - 1 + t.fixItems[vaa] = item + } else { + t.logger.Info("Vaa txhash fixed", zap.String("vaaID", vaa), zap.String("txHash", txHash)) + item.Event.TxHash = txHash + t.pushFunc(ctx, &item.Event) + delete(t.fixItems, vaa) + + } + } else { + t.logger.Error("Vaa txhash fix failed", zap.String("vaaID", vaa)) + // publish the event to the topic anyway + t.pushFunc(ctx, &item.Event) + delete(t.fixItems, vaa) + } + } + } + time.Sleep(t.sleepTime) + } +} + +// handleEmptyVaaTxHash tries to get the txhash for the vaa with the given id. +func (p *TxHashHandler) handleEmptyVaaTxHash(ctx context.Context, id string) (string, error) { + vaaIdTxHash, err := p.repository.GetVaaIdTxHash(ctx, id) + if err != nil { + return "", err + } + + if vaaIdTxHash.TxHash == "" { + return "", fmt.Errorf("txhash for vaa (%s) is empty", id) + } + + err = p.repository.UpdateVaaDocTxHash(ctx, id, vaaIdTxHash.TxHash) + if err != nil { + return "", err + } + return vaaIdTxHash.TxHash, nil +}