Improve txhash handler (#237)

* txHash separate handler

* remove unused code

* tests

* improve test and fix error

---------

Co-authored-by: gipsh <gipsh@gmail.com>
This commit is contained in:
gipsh 2023-04-17 10:42:28 -03:00 committed by GitHub
parent 40956d2391
commit 3c597917f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 273 additions and 55 deletions

View File

@ -69,8 +69,13 @@ func main() {
// create a new pipeline repository.
repository := pipeline.NewRepository(db.Database, logger)
// create and start a new tx hash handler.
quit := make(chan bool)
txHashHandler := pipeline.NewTxHashHandler(repository, pushFunc, logger, quit)
go txHashHandler.Run(rootCtx)
// create a new publisher.
publisher := pipeline.NewPublisher(pushFunc, repository, config.P2pNetwork, logger)
publisher := pipeline.NewPublisher(pushFunc, repository, config.P2pNetwork, txHashHandler, logger)
watcher := watcher.NewWatcher(rootCtx, db.Database, config.MongoDatabase, publisher.Publish, logger)
err = watcher.Start(rootCtx)
if err != nil {
@ -95,11 +100,15 @@ func main() {
logger.Info("root context cancelled, exiting...")
rootCtxCancel()
logger.Info("Closing tx hash handler ...")
close(quit)
logger.Info("Closing database connections ...")
db.Close()
logger.Info("Closing Http server ...")
server.Stop()
logger.Info("Finished wormhole-explorer-pipeline")
}
func newAwsConfig(appCtx context.Context, cfg *config.Configuration) (aws.Config, error) {

View File

@ -16,6 +16,8 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.1.1
github.com/aws/aws-sdk-go-v2/credentials v1.1.1
github.com/aws/aws-sdk-go-v2/service/sns v1.20.2
github.com/golang/mock v1.6.0
github.com/test-go/testify v1.1.4
github.com/wormhole-foundation/wormhole-explorer/common v0.0.0-00010101000000-000000000000
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230413131841-4f1feb38999a
)
@ -32,6 +34,7 @@ require (
github.com/aws/smithy-go v1.13.5 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/ethereum/go-ethereum v1.10.21 // indirect
github.com/golang/snappy v0.0.4 // indirect
@ -43,6 +46,7 @@ require (
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/stretchr/testify v1.8.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
@ -60,6 +64,7 @@ require (
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.2.0 // indirect
golang.org/x/text v0.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
// Needed for cosmos-sdk based chains. See

View File

@ -44,6 +44,8 @@ github.com/ethereum/go-ethereum v1.10.21 h1:5lqsEx92ZaZzRyOqBEXux4/UR06m296RGzN3
github.com/ethereum/go-ethereum v1.10.21/go.mod h1:EYFyF19u3ezGLD4RqOkLq+ZCXzYbLoNDdZlMt7kyKFg=
github.com/gofiber/fiber/v2 v2.40.1 h1:pc7n9VVpGIqNsvg9IPLQhyFEMJL8gCs1kneH5D1pIl4=
github.com/gofiber/fiber/v2 v2.40.1/go.mod h1:Gko04sLksnHbzLSRBFWPFdzM9Ws9pRxvvIaohJK1dsk=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
@ -98,6 +100,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/test-go/testify v1.1.4 h1:Tf9lntrKUMHiXQ07qBScBTSA0dhYQlu83hswqelv1iE=
github.com/test-go/testify v1.1.4/go.mod h1:rH7cfJo/47vWGdi4GPj16x3/t1xGOj2YxzmNQzk2ghU=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
@ -169,6 +173,7 @@ golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@ -176,6 +181,7 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -0,0 +1,65 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: repository.go
// Package mocks is a generated GoMock package.
package mocks
import (
context "context"
reflect "reflect"
gomock "github.com/golang/mock/gomock"
pipeline "github.com/wormhole-foundation/wormhole-explorer/pipeline/pipeline"
)
// MockIRepository is a mock of IRepository interface.
type MockIRepository struct {
ctrl *gomock.Controller
recorder *MockIRepositoryMockRecorder
}
// MockIRepositoryMockRecorder is the mock recorder for MockIRepository.
type MockIRepositoryMockRecorder struct {
mock *MockIRepository
}
// NewMockIRepository creates a new mock instance.
func NewMockIRepository(ctrl *gomock.Controller) *MockIRepository {
mock := &MockIRepository{ctrl: ctrl}
mock.recorder = &MockIRepositoryMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockIRepository) EXPECT() *MockIRepositoryMockRecorder {
return m.recorder
}
// GetVaaIdTxHash mocks base method.
func (m *MockIRepository) GetVaaIdTxHash(ctx context.Context, id string) (*pipeline.VaaIdTxHash, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetVaaIdTxHash", ctx, id)
ret0, _ := ret[0].(*pipeline.VaaIdTxHash)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetVaaIdTxHash indicates an expected call of GetVaaIdTxHash.
func (mr *MockIRepositoryMockRecorder) GetVaaIdTxHash(ctx, id interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetVaaIdTxHash", reflect.TypeOf((*MockIRepository)(nil).GetVaaIdTxHash), ctx, id)
}
// UpdateVaaDocTxHash mocks base method.
func (m *MockIRepository) UpdateVaaDocTxHash(ctx context.Context, id, txhash string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "UpdateVaaDocTxHash", ctx, id, txhash)
ret0, _ := ret[0].(error)
return ret0
}
// UpdateVaaDocTxHash indicates an expected call of UpdateVaaDocTxHash.
func (mr *MockIRepositoryMockRecorder) UpdateVaaDocTxHash(ctx, id, txhash interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateVaaDocTxHash", reflect.TypeOf((*MockIRepository)(nil).UpdateVaaDocTxHash), ctx, id, txhash)
}

View File

@ -2,8 +2,6 @@ package pipeline
import (
"context"
"fmt"
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/pipeline/topic"
@ -14,19 +12,22 @@ import (
// Publisher definition.
type Publisher struct {
logger *zap.Logger
pushFunc topic.PushFunc
repository *Repository
p2pNetwork string
logger *zap.Logger
pushFunc topic.PushFunc
repository *Repository
p2pNetwork string
txHashHandler *TxHashHandler
}
// NewPublisher creates a new publisher for vaa with parse configuration.
func NewPublisher(pushFunc topic.PushFunc, repository *Repository, p2pNetwork string, logger *zap.Logger) *Publisher {
func NewPublisher(pushFunc topic.PushFunc, repository *Repository, p2pNetwork string, txHashHandler *TxHashHandler, logger *zap.Logger) *Publisher {
return &Publisher{
logger: logger,
repository: repository,
pushFunc: pushFunc,
p2pNetwork: p2pNetwork}
logger: logger,
repository: repository,
pushFunc: pushFunc,
p2pNetwork: p2pNetwork,
txHashHandler: txHashHandler,
}
}
// Publish sends a Event for the vaa that has parse configuration defined.
@ -56,12 +57,11 @@ func (p *Publisher) Publish(ctx context.Context, e *watcher.Event) {
// discard pyth messages
isPyth := domain.P2pMainNet == p.p2pNetwork && vaa.ChainIDPythNet == vaa.ChainID(e.ChainID)
if !isPyth {
// retry 3 times with 2 seconds delay fixing the vaa with empty txhash.
txHash, err := Retry(p.handleEmptyVaaTxHash, 3, 2*time.Second)(ctx, e.ID)
if err != nil {
p.logger.Error("can not get txhash for vaa", zap.Error(err), zap.String("event", event.ID))
}
event.TxHash = txHash
// add the event to the txhash handler.
// the handler will try to get the txhash for the vaa
// and publish the event with the txhash.
p.txHashHandler.AddVaaFixItem(event)
return
}
}
@ -71,40 +71,3 @@ func (p *Publisher) Publish(ctx context.Context, e *watcher.Event) {
p.logger.Error("can not push event to topic", zap.Error(err), zap.String("event", event.ID))
}
}
// handleEmptyVaaTxHash tries to get the txhash for the vaa with the given id.
func (p *Publisher) handleEmptyVaaTxHash(ctx context.Context, id string) (string, error) {
vaaIdTxHash, err := p.repository.GetVaaIdTxHash(ctx, id)
if err != nil {
return "", err
}
if vaaIdTxHash.TxHash == "" {
return "", fmt.Errorf("txhash for vaa (%s) is empty", id)
}
err = p.repository.UpdateVaaDocTxHash(ctx, id, vaaIdTxHash.TxHash)
if err != nil {
return "", err
}
return vaaIdTxHash.TxHash, nil
}
// RetryFn is a function that can be retried.
type RetryFn func(ctx context.Context, id string) (string, error)
// Retry retries a function.
func Retry(retryFn RetryFn, retries int, delay time.Duration) RetryFn {
return func(ctx context.Context, id string) (string, error) {
var err error
var txHash string
for i := 0; i <= retries; i++ {
txHash, err = retryFn(ctx, id)
if err == nil {
return txHash, nil
}
time.Sleep(delay)
}
return txHash, err
}
}

View File

@ -1,3 +1,4 @@
//go:generate mockgen -source=repository.go -destination=mocks/repository.go -package=mocks
package pipeline
import (
@ -10,6 +11,12 @@ import (
"go.uber.org/zap"
)
// Interface
type IRepository interface {
GetVaaIdTxHash(ctx context.Context, id string) (*VaaIdTxHash, error)
UpdateVaaDocTxHash(ctx context.Context, id string, txhash string) error
}
// Repository is the repository data access layer.
type Repository struct {
db *mongo.Database

View File

@ -0,0 +1,62 @@
package tests
import (
"context"
"fmt"
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/test-go/testify/assert"
"github.com/test-go/testify/require"
"github.com/wormhole-foundation/wormhole-explorer/pipeline/pipeline"
"github.com/wormhole-foundation/wormhole-explorer/pipeline/pipeline/mocks"
"github.com/wormhole-foundation/wormhole-explorer/pipeline/topic"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"
)
func TestNewTxHashHandler(t *testing.T) {
mock := gomock.NewController(t)
defer mock.Finish()
repo := mocks.NewMockIRepository(mock)
// log, _ := zap.NewDevelopment()
observedZapCore, observedLogs := observer.New(zap.InfoLevel)
observedLogger := zap.New(observedZapCore)
quit := make(chan bool)
var f = topic.PushFunc(func(context.Context, *topic.Event) error {
return nil
})
txHashHandler := pipeline.NewTxHashHandler(repo, f, observedLogger, quit)
txHashHandler.AddVaaFixItem(topic.Event{
ID: "vaa1",
},
)
ctx := context.Background()
repo.EXPECT().GetVaaIdTxHash(ctx, "vaa1").Return(nil, fmt.Errorf("error"))
repo.EXPECT().GetVaaIdTxHash(ctx, "vaa1").Return(&pipeline.VaaIdTxHash{
ChainID: 1,
TxHash: "0xbabla",
}, nil)
repo.EXPECT().UpdateVaaDocTxHash(ctx, "vaa1", "0xbabla").Return(nil)
go txHashHandler.Run(ctx)
time.Sleep(6 * time.Second)
close(quit)
require.Equal(t, 3, observedLogs.Len())
allLogs := observedLogs.All()
// first attempt to get txhash should fail
assert.Equal(t, "Error while trying to fix vaa txhash", allLogs[1].Message)
// second attempt to get txhash should succeed
assert.Equal(t, "Vaa txhash fixed", allLogs[2].Message)
}

View File

@ -0,0 +1,101 @@
package pipeline
import (
"context"
"fmt"
"time"
"github.com/wormhole-foundation/wormhole-explorer/pipeline/topic"
"go.uber.org/zap"
)
type ItemTuple struct {
Retries int
Event topic.Event
}
type TxHashHandler struct {
logger *zap.Logger
repository IRepository
fixItems map[string]ItemTuple
inputQueue chan topic.Event
quit chan bool
sleepTime time.Duration
pushFunc topic.PushFunc
defaultRetries int
}
func NewTxHashHandler(repository IRepository, pushFunc topic.PushFunc, logger *zap.Logger, quit chan bool) *TxHashHandler {
return &TxHashHandler{
logger: logger,
repository: repository,
fixItems: map[string]ItemTuple{},
inputQueue: make(chan topic.Event, 100),
sleepTime: 2 * time.Second,
pushFunc: pushFunc,
defaultRetries: 3,
}
}
// Add a new element to the fixItems array
func (t *TxHashHandler) AddVaaFixItem(event topic.Event) {
t.inputQueue <- event
}
func (t *TxHashHandler) Run(ctx context.Context) {
t.logger.Info("TxHashHandler started")
for {
select {
case <-t.quit:
t.logger.Info("stopping txhash handler")
return
case event := <-t.inputQueue:
t.fixItems[event.ID] = ItemTuple{
Retries: 3,
Event: event,
}
default:
// no lock needed. the map is never updated while iterating.
for vaa, item := range t.fixItems {
if item.Retries > 0 {
txHash, err := t.handleEmptyVaaTxHash(ctx, vaa)
if err != nil {
t.logger.Error("Error while trying to fix vaa txhash", zap.Int("retries_count", item.Retries), zap.Error(err))
item.Retries = item.Retries - 1
t.fixItems[vaa] = item
} else {
t.logger.Info("Vaa txhash fixed", zap.String("vaaID", vaa), zap.String("txHash", txHash))
item.Event.TxHash = txHash
t.pushFunc(ctx, &item.Event)
delete(t.fixItems, vaa)
}
} else {
t.logger.Error("Vaa txhash fix failed", zap.String("vaaID", vaa))
// publish the event to the topic anyway
t.pushFunc(ctx, &item.Event)
delete(t.fixItems, vaa)
}
}
}
time.Sleep(t.sleepTime)
}
}
// handleEmptyVaaTxHash tries to get the txhash for the vaa with the given id.
func (p *TxHashHandler) handleEmptyVaaTxHash(ctx context.Context, id string) (string, error) {
vaaIdTxHash, err := p.repository.GetVaaIdTxHash(ctx, id)
if err != nil {
return "", err
}
if vaaIdTxHash.TxHash == "" {
return "", fmt.Errorf("txhash for vaa (%s) is empty", id)
}
err = p.repository.UpdateVaaDocTxHash(ctx, id, vaaIdTxHash.TxHash)
if err != nil {
return "", err
}
return vaaIdTxHash.TxHash, nil
}