Compare commits

...

2 Commits

Author SHA1 Message Date
Fernando Torres ac862cde08
wip 2024-04-26 17:43:31 -03:00
Agustin Pazos d8dcd1bebe wip 2024-04-26 14:56:04 -03:00
35 changed files with 932 additions and 70 deletions

View File

@ -35,3 +35,5 @@ PROTOCOLS_STATS_VERSION=v1
PROTOCOLS_ACTIVITY_VERSION=v1
# rpc provider json
RPC_PROVIDER_JSON=
GUARDIAN_PROVIDER_JSON=

View File

@ -39,3 +39,4 @@ PROTOCOLS_STATS_VERSION=v1
PROTOCOLS_ACTIVITY_VERSION=v1
# rpc provider json
RPC_PROVIDER_JSON=
GUARDIAN_PROVIDER_JSON=

View File

@ -35,3 +35,4 @@ PROTOCOLS_STATS_VERSION=v1
PROTOCOLS_ACTIVITY_VERSION=v1
# rpc provider json
RPC_PROVIDER_JSON=
GUARDIAN_PROVIDER_JSON=

View File

@ -39,3 +39,4 @@ PROTOCOLS_STATS_VERSION=v1
PROTOCOLS_ACTIVITY_VERSION=v1
# rpc provider json
RPC_PROVIDER_JSON=
GUARDIAN_PROVIDER_JSON=

View File

@ -0,0 +1,9 @@
---
kind: Secret
apiVersion: v1
metadata:
name: guardian-provider
namespace: {{ .NAMESPACE }}
type: Opaque
data:
guardian-provider.json: {{ .GUARDIAN_PROVIDER_JSON | b64enc }}

View File

@ -0,0 +1,9 @@
---
kind: ConfigMap
apiVersion: v1
metadata:
name: fly-event-processor
namespace: {{ .NAMESPACE }}
data:
aws-region: {{ .SQS_AWS_REGION }}
duplicate-vaa-sqs-url: {{ .DUPLICATE_VAA_SQS_URL }}

View File

@ -0,0 +1,17 @@
ENVIRONMENT=production-mainnet
NAMESPACE=wormscan
NAME=wormscan-fly-event-processor
REPLICAS=2
IMAGE_NAME=
RESOURCES_LIMITS_MEMORY=256Mi
RESOURCES_LIMITS_CPU=500m
RESOURCES_REQUESTS_MEMORY=128Mi
RESOURCES_REQUESTS_CPU=250m
DUPLICATE_VAA_SQS_URL=
SQS_AWS_REGION=
P2P_NETWORK=mainnet
PPROF_ENABLED=false
AWS_IAM_ROLE=
ALERT_ENABLED=false
METRICS_ENABLED=true
CONSUMER_WORKER_SIZE=1

View File

@ -0,0 +1,17 @@
ENVIRONMENT=production-testnet
NAMESPACE=wormscan-testnet
NAME=wormscan-fly-event-processor
REPLICAS=1
IMAGE_NAME=
RESOURCES_LIMITS_MEMORY=30Mi
RESOURCES_LIMITS_CPU=20m
RESOURCES_REQUESTS_MEMORY=15Mi
RESOURCES_REQUESTS_CPU=10m
DUPLICATE_VAA_SQS_URL=
SQS_AWS_REGION=
P2P_NETWORK=testnet
PPROF_ENABLED=false
AWS_IAM_ROLE=
ALERT_ENABLED=false
METRICS_ENABLED=true
CONSUMER_WORKER_SIZE=1

View File

@ -0,0 +1,17 @@
ENVIRONMENT=staging-mainnet
NAMESPACE=wormscan
NAME=wormscan-fly-event-processor
REPLICAS=2
IMAGE_NAME=
RESOURCES_LIMITS_MEMORY=256Mi
RESOURCES_LIMITS_CPU=500m
RESOURCES_REQUESTS_MEMORY=128Mi
RESOURCES_REQUESTS_CPU=250m
DUPLICATE_VAA_SQS_URL=
SQS_AWS_REGION=
P2P_NETWORK=mainnet
PPROF_ENABLED=true
AWS_IAM_ROLE=
ALERT_ENABLED=false
METRICS_ENABLED=true
CONSUMER_WORKER_SIZE=1

View File

@ -0,0 +1,17 @@
ENVIRONMENT=staging-testnet
NAMESPACE=wormscan-testnet
NAME=wormscan-fly-event-processor
REPLICAS=1
IMAGE_NAME=
RESOURCES_LIMITS_MEMORY=30Mi
RESOURCES_LIMITS_CPU=20m
RESOURCES_REQUESTS_MEMORY=15Mi
RESOURCES_REQUESTS_CPU=10m
DUPLICATE_VAA_SQS_URL=
SQS_AWS_REGION=
P2P_NETWORK=testnet
PPROF_ENABLED=false
AWS_IAM_ROLE=
ALERT_ENABLED=false
METRICS_ENABLED=true
CONSUMER_WORKER_SIZE=1

View File

@ -0,0 +1,27 @@
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
name: keda-auth-aws-{{ .NAME }}
namespace: {{ .NAMESPACE }}
spec:
podIdentity:
provider: aws
---
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: {{ .NAME }}
namespace: {{ .NAMESPACE }}
spec:
scaleTargetRef:
name: {{ .NAME }}
minReplicaCount: 2
maxReplicaCount: 10
triggers:
- type: aws-sqs-queue
authenticationRef:
name: keda-auth-aws-{{ .NAME }}
metadata:
awsRegion: {{ .SQS_AWS_REGION }}
queueURL: {{ .DUPLICATE_VAA_SQS_URL }}
queueLength: "5"

View File

@ -0,0 +1,102 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .NAME }}
namespace: {{ .NAMESPACE }}
spec:
replicas: {{ .REPLICAS }}
selector:
matchLabels:
app: {{ .NAME }}
template:
metadata:
labels:
app: {{ .NAME }}
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8000"
spec:
restartPolicy: Always
terminationGracePeriodSeconds: 40
serviceAccountName: fly-events-processor
containers:
- name: {{ .NAME }}
image: {{ .IMAGE_NAME }}
imagePullPolicy: Always
volumeMounts:
- name: fly-event-processor-config
mountPath: /opt/fly-event-processor
readinessProbe:
initialDelaySeconds: 30
periodSeconds: 20
timeoutSeconds: 3
failureThreshold: 3
httpGet:
path: /api/ready
port: 8000
livenessProbe:
initialDelaySeconds: 30
periodSeconds: 30
timeoutSeconds: 3
failureThreshold: 3
httpGet:
path: /api/health
port: 8000
env:
- name: ENVIRONMENT
value: {{ .ENVIRONMENT }}
- name: PORT
value: "8000"
- name: LOG_LEVEL
value: "INFO"
- name: MONGODB_URI
valueFrom:
secretKeyRef:
name: mongodb
key: mongo-uri
- name: MONGODB_DATABASE
valueFrom:
configMapKeyRef:
name: config
key: mongo-database
- name: DUPLICATE_VAA_SQS_URL
valueFrom:
configMapKeyRef:
name: fly-event-processor
key: duplicate-vaa-sqs-url
- name: AWS_REGION
valueFrom:
configMapKeyRef:
name: fly-event-processor
key: aws-region
- name: PPROF_ENABLED
value: "{{ .PPROF_ENABLED }}"
- name: P2P_NETWORK
value: {{ .P2P_NETWORK }}
- name: ALERT_ENABLED
value: "{{ .ALERT_ENABLED }}"
- name: ALERT_API_KEY
valueFrom:
secretKeyRef:
name: opsgenie
key: api-key
- name: METRICS_ENABLED
value: "{{ .METRICS_ENABLED }}"
- name: CONSUMER_WORKER_SIZE
value: "{{ .CONSUMER_WORKER_SIZE }}"
- name: GUARDIAN_API_PROVIDER_PATH
value: "/opt/fly-event-processor/guardian-provider.json"
resources:
limits:
memory: {{ .RESOURCES_LIMITS_MEMORY }}
cpu: {{ .RESOURCES_LIMITS_CPU }}
requests:
memory: {{ .RESOURCES_REQUESTS_MEMORY }}
cpu: {{ .RESOURCES_REQUESTS_CPU }}
volumes:
- name: fly-event-processor-config
secret:
secretName: guardian-provider
items:
- key: guardian-provider.json
path: guardian-provider.json

View File

@ -0,0 +1,7 @@
apiVersion: v1
kind: ServiceAccount
metadata:
name: fly-events-processor
namespace: {{ .NAMESPACE }}
annotations:
eks.amazonaws.com/role-arn: {{ .AWS_IAM_ROLE }}

View File

@ -17,19 +17,19 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/common/health"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/wormhole-foundation/wormhole-explorer/common/pool"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/http/vaa"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/processor"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/queue"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/storage"
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/zap"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/config"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/consumer"
consumerRepo "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/consumer"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/http/infrastructure"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/internal/metrics"
)
type exitCode int
func Run() {
rootCtx, rootCtxCancel := context.WithCancel(context.Background())
@ -48,7 +48,6 @@ func Run() {
// create guardian provider pool
guardianApiProviderPool, err := newGuardianProviderPool(cfg)
_, err = newGuardianProviderPool(cfg)
if err != nil {
logger.Fatal("Error creating guardian provider pool: ", zap.Error(err))
}
@ -59,19 +58,24 @@ func Run() {
log.Fatal("Failed to initialize MongoDB client: ", err)
}
repository := consumerRepo.NewRepository(logger, db.Database)
// create a new repository
repository := storage.NewRepository(logger, db.Database)
// create a new processor
processor := processor.NewProcessor(guardianApiProviderPool, repository, logger, metrics)
// start serving /health and /ready endpoints
healthChecks, err := makeHealthChecks(rootCtx, cfg, db.Database)
if err != nil {
logger.Fatal("Failed to create health checks", zap.Error(err))
}
server := infrastructure.NewServer(logger, cfg.Port, cfg.PprofEnabled, healthChecks...)
vaaCtrl := vaa.NewController(processor.Process, repository, logger)
server := infrastructure.NewServer(logger, cfg.Port, vaaCtrl, cfg.PprofEnabled, healthChecks...)
server.Start()
// create and start a duplicate VAA consumer.
duplicateVaaConsumeFunc := newDuplicateVaaConsumeFunc(rootCtx, cfg, metrics, logger)
duplicateVaa := consumer.New(duplicateVaaConsumeFunc, guardianApiProviderPool, rootCtx, logger, repository, metrics, cfg.P2pNetwork, cfg.ConsumerWorkerSize)
duplicateVaa := consumer.New(duplicateVaaConsumeFunc, processor.Process, logger, metrics, cfg.P2pNetwork, cfg.ConsumerWorkerSize)
duplicateVaa.Start(rootCtx)
logger.Info("Started wormholescan-fly-event-processor")

View File

@ -5,16 +5,18 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/common/pool"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/processor"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/queue"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
// Consumer consumer struct definition.
type Consumer struct {
consumeFunc queue.ConsumeFunc
processor processor.ProcessorFunc
guardianPool *pool.Pool
logger *zap.Logger
repository *Repository
metrics metrics.Metrics
p2pNetwork string
workersSize int
@ -23,23 +25,20 @@ type Consumer struct {
// New creates a new vaa consumer.
func New(
consumeFunc queue.ConsumeFunc,
guardianPool *pool.Pool,
ctx context.Context,
processor processor.ProcessorFunc,
logger *zap.Logger,
repository *Repository,
metrics metrics.Metrics,
p2pNetwork string,
workersSize int,
) *Consumer {
c := Consumer{
consumeFunc: consumeFunc,
guardianPool: guardianPool,
logger: logger,
repository: repository,
metrics: metrics,
p2pNetwork: p2pNetwork,
workersSize: workersSize,
consumeFunc: consumeFunc,
processor: processor,
logger: logger,
metrics: metrics,
p2pNetwork: p2pNetwork,
workersSize: workersSize,
}
return &c
@ -54,17 +53,47 @@ func (c *Consumer) Start(ctx context.Context) {
}
func (c *Consumer) producerLoop(ctx context.Context, ch <-chan queue.ConsumerMessage) {
for {
select {
case <-ctx.Done():
return
case msg := <-ch:
c.processEvent(ctx, *msg.Data())
c.processEvent(ctx, msg)
}
}
}
func (c *Consumer) processEvent(ctx context.Context, event queue.Event) {
//TODO
func (c *Consumer) processEvent(ctx context.Context, msg queue.ConsumerMessage) {
event := msg.Data()
vaaID := event.Data.VaaID
chainID := sdk.ChainID(event.Data.ChainID)
logger := c.logger.With(
zap.String("trackId", event.TrackID),
zap.String("vaaId", vaaID))
if msg.IsExpired() {
msg.Failed()
logger.Debug("event is expired")
c.metrics.IncDuplicatedVaaExpired(chainID)
return
}
params := &processor.Params{
TrackID: event.TrackID,
VaaID: vaaID,
ChainID: chainID,
}
err := c.processor(ctx, params)
if err != nil {
msg.Failed()
logger.Error("error processing event", zap.Error(err))
c.metrics.IncDuplicatedVaaFailed(chainID)
return
}
msg.Done()
logger.Debug("event processed")
c.metrics.IncDuplicatedVaaProcessed(chainID)
}

View File

@ -1,24 +0,0 @@
package consumer
import (
commonRepo "github.com/wormhole-foundation/wormhole-explorer/common/repository"
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/zap"
)
// Repository exposes operations over the `globalTransactions` collection.
type Repository struct {
logger *zap.Logger
vaas *mongo.Collection
duplicateVaas *mongo.Collection
}
// New creates a new repository.
func NewRepository(logger *zap.Logger, db *mongo.Database) *Repository {
r := Repository{
logger: logger,
vaas: db.Collection(commonRepo.Vaas),
duplicateVaas: db.Collection(commonRepo.DuplicateVaas),
}
return &r
}

View File

@ -9,14 +9,20 @@ require (
github.com/aws/aws-sdk-go-v2/credentials v1.13.15
github.com/gofiber/fiber/v2 v2.52.4
github.com/joho/godotenv v1.5.1
github.com/prometheus/client_golang v1.16.0
github.com/sethvargo/go-envconfig v1.0.0
github.com/spf13/cobra v1.8.0
github.com/wormhole-foundation/wormhole-explorer/common v0.0.0-20240422172607-688a0d0f718e
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240416174455-25e60611a867
go.mongodb.org/mongo-driver v1.11.2
go.uber.org/zap v1.27.0
golang.org/x/net v0.21.0
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22
)
require (
github.com/algorand/go-algorand-sdk v1.23.0 // indirect
github.com/algorand/go-codec/codec v1.1.8 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.23 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.29 // indirect
@ -31,12 +37,13 @@ require (
github.com/aws/smithy-go v1.13.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect
github.com/certusone/wormhole/node v0.0.0-20240416174455-25e60611a867 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cosmos/btcutil v1.0.5 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/deepmap/oapi-codegen v1.8.2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/ethereum/go-ethereum v1.10.21 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/gofiber/adaptor/v2 v2.2.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
@ -52,9 +59,8 @@ require (
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/onsi/gomega v1.30.0 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.16.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
@ -63,14 +69,12 @@ require (
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.51.0 // indirect
github.com/valyala/tcplisten v1.0.0 // indirect
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240416174455-25e60611a867 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/sync v0.4.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect

View File

@ -1,3 +1,8 @@
github.com/algorand/go-algorand-sdk v1.23.0 h1:wlEV6OgDVc/sLeF2y41bwNG/Lr8EoMnN87Ur8N2Gyyo=
github.com/algorand/go-algorand-sdk v1.23.0/go.mod h1:7i2peZBcE48kfoxNZnLA+mklKh812jBKvQ+t4bn0KBQ=
github.com/algorand/go-codec v1.1.8/go.mod h1:XhzVs6VVyWMLu6cApb9/192gBjGRVGm5cX5j203Heg4=
github.com/algorand/go-codec/codec v1.1.8 h1:lsFuhcOH2LiEhpBH3BVUUkdevVmwCRyvb7FCAAPeY6U=
github.com/algorand/go-codec/codec v1.1.8/go.mod h1:tQ3zAJ6ijTps6V+wp8KsGDnPC2uhHVC7ANyrtkIY0bA=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/ansrivas/fiberprometheus/v2 v2.6.1 h1:wac3pXaE6BYYTF04AC6K0ktk6vCD+MnDOJZ3SK66kXM=
@ -35,11 +40,16 @@ github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8=
github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/btcsuite/btcd v0.22.1 h1:CnwP9LM/M9xuRrGSCGeMVs9iv09uMqwsVX7EeIpgV2c=
github.com/btcsuite/btcd/btcec/v2 v2.3.2 h1:5n0X6hX0Zk+6omWcihdYvdAlGf2DfasC0GMf7DClJ3U=
github.com/btcsuite/btcd/btcec/v2 v2.3.2/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U=
github.com/certusone/wormhole/node v0.0.0-20240416174455-25e60611a867 h1:Wdd/ZJuGD3logxkNuT3hA2aq0Uk5uDGMGhca+S1CDnM=
github.com/certusone/wormhole/node v0.0.0-20240416174455-25e60611a867/go.mod h1:vJHIhQ0MeHZfQ4OpGiUCm3LD3nrdfT1CEIh2JaPCCso=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cosmos/btcutil v1.0.5 h1:t+ZFcX77LpKtDBhjucvnOH8C2l2ioGsBNEQ3jef8xFk=
github.com/cosmos/btcutil v1.0.5/go.mod h1:IyB7iuqZMJlthe2tkIFL33xPyzbFYP0XVdS8P5lUPis=
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -56,7 +66,6 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cu
github.com/ethereum/go-ethereum v1.10.21 h1:5lqsEx92ZaZzRyOqBEXux4/UR06m296RGzN3ol3teJY=
github.com/ethereum/go-ethereum v1.10.21/go.mod h1:EYFyF19u3ezGLD4RqOkLq+ZCXzYbLoNDdZlMt7kyKFg=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/getkin/kin-openapi v0.61.0/go.mod h1:7Yn5whZr5kJi6t+kShccXS8ae1APpYTW6yheSwk8Yi4=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-chi/chi/v5 v5.0.0/go.mod h1:BBug9lr0cqtdAhsu6R4AAdvufI0/XBzAQSsUqJpoZOs=
@ -125,10 +134,11 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zk
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o=
github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/gomega v1.30.0 h1:hvMK7xYz4D3HapigLTeGdId/NcfQx1VHMJc60ew99+8=
github.com/onsi/gomega v1.30.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@ -159,6 +169,7 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/test-go/testify v1.1.4 h1:Tf9lntrKUMHiXQ07qBScBTSA0dhYQlu83hswqelv1iE=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
@ -181,6 +192,7 @@ github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.mongodb.org/mongo-driver v1.11.2 h1:+1v2rDQUWNcGW7/7E0Jvdz51V38XXxJfhzbV17aNHCw=
go.mongodb.org/mongo-driver v1.11.2/go.mod h1:s7p5vEtfbeR1gYi6pnj3c3/urpbLv2T5Sfd6Rp2HBB8=
@ -197,12 +209,14 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
@ -226,10 +240,10 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
@ -251,19 +265,26 @@ golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 h1:VpOs+IwYnYBaFnrNAeB8UUWtL3vEUnzSCL1nVjPhqrw=
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -0,0 +1,20 @@
package guardian
import (
client "github.com/wormhole-foundation/wormhole-explorer/common/client/guardian"
"github.com/wormhole-foundation/wormhole-explorer/common/pool"
)
// GuardianAPIClient is a wrapper around the Guardian API client and the pool of providers.
type GuardianAPIClient struct {
Client *client.GuardianAPIClient
Pool *pool.Pool
}
// NewGuardianAPIClient creates a new Guardian API client.
func NewGuardianAPIClient(client *client.GuardianAPIClient, pool *pool.Pool) *GuardianAPIClient {
return &GuardianAPIClient{
Client: client,
Pool: pool,
}
}

View File

@ -5,6 +5,7 @@ import (
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/pprof"
health "github.com/wormhole-foundation/wormhole-explorer/common/health"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/http/vaa"
"go.uber.org/zap"
)
@ -14,7 +15,7 @@ type Server struct {
logger *zap.Logger
}
func NewServer(logger *zap.Logger, port string, pprofEnabled bool, checks ...health.Check) *Server {
func NewServer(logger *zap.Logger, port string, vaaController *vaa.Controller, pprofEnabled bool, checks ...health.Check) *Server {
app := fiber.New(fiber.Config{DisableStartupMessage: true})
prometheus := fiberprometheus.New("wormscan-fly-event-processor")
prometheus.RegisterAt(app, "/metrics")
@ -29,7 +30,7 @@ func NewServer(logger *zap.Logger, port string, pprofEnabled bool, checks ...hea
api := app.Group("/api")
api.Get("/health", ctrl.HealthCheck)
api.Get("/ready", ctrl.ReadyCheck)
api.Post("/vaa/duplicated", vaaController.Process)
return &Server{
app: app,
port: port,

View File

@ -0,0 +1,55 @@
package vaa
import (
"fmt"
"github.com/gofiber/fiber/v2"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/processor"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/storage"
"go.uber.org/zap"
)
// Controller definition.
type Controller struct {
logger *zap.Logger
repository *storage.Repository
processor processor.ProcessorFunc
}
// NewController creates a Controller instance.
// func NewController(repository *Repository, processor processor.ProcessorFunc, logger *zap.Logger) *Controller {
func NewController(processor processor.ProcessorFunc, repository *storage.Repository, logger *zap.Logger) *Controller {
return &Controller{processor: processor, repository: repository, logger: logger}
}
// Process processes the VAA message.
func (c *Controller) Process(ctx *fiber.Ctx) error {
request := struct {
VaaID string `json:"vaaId"`
}{}
if err := ctx.BodyParser(&request); err != nil {
c.logger.Error("error parsing request", zap.Error(err))
return err
}
vaa, err := c.repository.FindVAAById(ctx.Context(), request.VaaID)
if err != nil {
c.logger.Error("error getting vaa from collection", zap.Error(err))
return err
}
params := processor.Params{
TrackID: fmt.Sprintf("controller-%s", request.VaaID),
VaaID: request.VaaID,
ChainID: vaa.EmitterChain,
}
err = c.processor(ctx.Context(), &params)
if err != nil {
c.logger.Error("error processing vaa", zap.Error(err))
return err
}
return ctx.JSON(fiber.Map{"message": "success"})
}

View File

@ -1,5 +1,7 @@
package metrics
import sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
// DummyMetrics is a dummy implementation of Metric interface.
type DummyMetrics struct{}
@ -7,3 +9,18 @@ type DummyMetrics struct{}
func NewDummyMetrics() *DummyMetrics {
return &DummyMetrics{}
}
// IncDuplicatedVaaConsumedQueue dummy implementation.
func (d *DummyMetrics) IncDuplicatedVaaConsumedQueue() {}
// IncDuplicatedVaaProcessed dummy implementation.
func (d *DummyMetrics) IncDuplicatedVaaProcessed(chainID sdk.ChainID) {}
// IncDuplicatedVaaFailed dummy implementation.
func (d *DummyMetrics) IncDuplicatedVaaFailed(chainID sdk.ChainID) {}
// IncDuplicatedVaaExpired dummy implementation.
func (d *DummyMetrics) IncDuplicatedVaaExpired(chainID sdk.ChainID) {}
// IncDuplicatedVaaCanNotFixed dummy implementation.
func (d *DummyMetrics) IncDuplicatedVaaCanNotFixed(chainID sdk.ChainID) {}

View File

@ -1,6 +1,13 @@
package metrics
const serviceName = "wormholescan-fly-event-processor"
import sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
const serviceName = "wormscan-fly-event-processor"
type Metrics interface {
IncDuplicatedVaaConsumedQueue()
IncDuplicatedVaaProcessed(chainID sdk.ChainID)
IncDuplicatedVaaFailed(chainID sdk.ChainID)
IncDuplicatedVaaExpired(chainID sdk.ChainID)
IncDuplicatedVaaCanNotFixed(chainID sdk.ChainID)
}

View File

@ -1,10 +1,51 @@
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
)
// PrometheusMetrics is a Prometheus implementation of Metric interface.
type PrometheusMetrics struct {
duplicatedVaaCount *prometheus.CounterVec
}
// NewPrometheusMetrics returns a new instance of PrometheusMetrics.
func NewPrometheusMetrics(environment string) *PrometheusMetrics {
return &PrometheusMetrics{}
return &PrometheusMetrics{
duplicatedVaaCount: promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "wormscan_fly_event_processor_duplicated_vaa_count",
Help: "The total number of duplicated VAA processed",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
}, []string{"chain", "type"}),
}
}
func (m *PrometheusMetrics) IncDuplicatedVaaConsumedQueue() {
m.duplicatedVaaCount.WithLabelValues("all", "consumed_queue").Inc()
}
func (m *PrometheusMetrics) IncDuplicatedVaaProcessed(chainID sdk.ChainID) {
chain := chainID.String()
m.duplicatedVaaCount.WithLabelValues(chain, "processed").Inc()
}
func (m *PrometheusMetrics) IncDuplicatedVaaFailed(chainID sdk.ChainID) {
chain := chainID.String()
m.duplicatedVaaCount.WithLabelValues(chain, "failed").Inc()
}
func (m *PrometheusMetrics) IncDuplicatedVaaExpired(chainID sdk.ChainID) {
chain := chainID.String()
m.duplicatedVaaCount.WithLabelValues(chain, "expired").Inc()
}
func (m *PrometheusMetrics) IncDuplicatedVaaCanNotFixed(chainID sdk.ChainID) {
chain := chainID.String()
m.duplicatedVaaCount.WithLabelValues(chain, "can_not_fixed").Inc()
}

View File

@ -0,0 +1,220 @@
package processor
import (
"errors"
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/client/guardian"
"github.com/wormhole-foundation/wormhole-explorer/common/pool"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/storage"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
"golang.org/x/net/context"
)
type Processor struct {
guardianPool *pool.Pool
repository *storage.Repository
logger *zap.Logger
metrics metrics.Metrics
}
func NewProcessor(
guardianPool *pool.Pool,
repository *storage.Repository,
logger *zap.Logger,
metrics metrics.Metrics,
) *Processor {
return &Processor{
guardianPool: guardianPool,
repository: repository,
logger: logger,
metrics: metrics,
}
}
func (p *Processor) Process(ctx context.Context, params *Params) error {
logger := p.logger.With(
zap.String("trackId", params.TrackID),
zap.String("vaaId", params.VaaID))
// 1. check if the vaa stored in the VAA collections is the correct one.
// 1.1 get vaa from Vaas collection
vaaDoc, err := p.repository.FindVAAById(ctx, params.VaaID)
if err != nil {
logger.Error("error getting vaa from collection", zap.Error(err))
return err
}
// 1.2 if the event time has not reached the finality time, the event fail and
// will be reprocesed on the next retry.
finalityTime := getFinalityTimeByChainID(params.ChainID)
if vaaDoc.Timestamp == nil {
logger.Error("vaa timestamp is nil")
return errors.New("vaa timestamp is nil")
}
vaaTimestamp := *vaaDoc.Timestamp
reachedFinalityTime := time.Now().After(vaaTimestamp.Add(finalityTime))
if !reachedFinalityTime {
logger.Debug("event time has not reached the finality time",
zap.Time("finalityTime", vaaTimestamp.Add(finalityTime)))
return errors.New("event time has not reached the finality time")
}
// 1.3 call guardian api to get signed_vaa.
guardians := p.guardianPool.GetItems()
var signedVaa *guardian.SignedVaa
for _, g := range guardians {
g.Wait(ctx)
guardianAPIClient, err := guardian.NewGuardianAPIClient(
guardian.DefaultTimeout,
g.Id,
logger)
if err != nil {
logger.Error("error creating guardian api client", zap.Error(err))
continue
}
signedVaa, err = guardianAPIClient.GetSignedVAA(params.VaaID)
if err != nil {
logger.Error("error getting signed vaa from guardian api", zap.Error(err))
continue
}
break
}
if signedVaa == nil {
logger.Error("error getting signed vaa from guardian api")
return errors.New("error getting signed vaa from guardian api")
}
// 1.4 compare digest from vaa and signedVaa
guardianVAA, err := sdk.Unmarshal(signedVaa.VaaBytes)
if err != nil {
logger.Error("error unmarshalling guardian signed vaa", zap.Error(err))
return err
}
vaa, err := sdk.Unmarshal(vaaDoc.Vaa)
if err != nil {
logger.Error("error unmarshalling vaa", zap.Error(err))
return err
}
// If the guardian digest is the same that the vaa digest,
// the stored vaa in the vaas collection is the correct one.
if guardianVAA.HexDigest() == vaa.HexDigest() {
logger.Debug("vaa stored in Vaas collections is the correct")
return nil
}
// 2. Check for each duplicate VAAs to detect which is the correct one.
// 2.1 This check is necessary to avoid race conditions when the vaa is processed
if vaaDoc.TxHash == "" {
logger.Error("vaa txHash is empty")
return errors.New("vaa txHash is empty")
}
// 2.2 Get all duplicate vaas by vaaId
duplicateVaaDocs, err := p.repository.FindDuplicateVAAs(ctx, params.VaaID)
if err != nil {
logger.Error("error getting duplicate vaas from collection", zap.Error(err))
return err
}
// 2.3 Check each duplicate VAA to detect which is the correct one.
for _, duplicateVaaDoc := range duplicateVaaDocs {
duplicateVaa, err := sdk.Unmarshal(duplicateVaaDoc.Vaa)
if err != nil {
logger.Error("error unmarshalling vaa", zap.Error(err))
return err
}
if guardianVAA.HexDigest() == duplicateVaa.HexDigest() {
err := p.repository.FixVAA(ctx, params.VaaID, duplicateVaaDoc.ID)
if err != nil {
logger.Error("error fixing vaa", zap.Error(err))
return err
}
logger.Debug("vaa fixed")
return nil
}
}
logger.Debug("can't fix duplicate vaa")
p.metrics.IncDuplicatedVaaCanNotFixed(params.ChainID)
return errors.New("can't fix duplicate vaa")
}
func getFinalityTimeByChainID(chainID sdk.ChainID) time.Duration {
// Time to finalize for each chain.
// ref: https://docs.wormhole.com/wormhole/reference/constants
switch chainID {
case sdk.ChainIDSolana:
return 14 * time.Second
case sdk.ChainIDEthereum:
return 975 * time.Second
case sdk.ChainIDTerra:
return 6 * time.Second
case sdk.ChainIDBSC:
return 48 * time.Second
case sdk.ChainIDPolygon:
return 66 * time.Second
case sdk.ChainIDAvalanche:
return 2 * time.Second
case sdk.ChainIDOasis:
return 12 * time.Second
case sdk.ChainIDAlgorand:
return 4 * time.Second
case sdk.ChainIDFantom:
return 5 * time.Second
case sdk.ChainIDKarura:
return 24 * time.Second
case sdk.ChainIDAcala:
return 24 * time.Second
case sdk.ChainIDKlaytn:
return 1 * time.Second
case sdk.ChainIDCelo:
return 10 * time.Second
case sdk.ChainIDNear:
return 2 * time.Second
case sdk.ChainIDMoonbeam:
return 24 * time.Second
case sdk.ChainIDTerra2:
return 6 * time.Second
case sdk.ChainIDInjective:
return 3 * time.Second
case sdk.ChainIDSui:
return 3 * time.Second
case sdk.ChainIDAptos:
return 4 * time.Second
case sdk.ChainIDArbitrum:
return 1066 * time.Second
case sdk.ChainIDOptimism:
return 1026 * time.Second
case sdk.ChainIDXpla:
return 5 * time.Second
case sdk.ChainIDBase:
return 1026 * time.Second
case sdk.ChainIDSei:
return 1 * time.Second
case sdk.ChainIDWormchain:
return 5 * time.Second
case sdk.ChainIDSepolia:
return 975 * time.Second
case sdk.ChainIDArbitrumSepolia:
return 1066 * time.Second
case sdk.ChainIDBaseSepolia:
return 1026 * time.Second
case sdk.ChainIDOptimismSepolia:
return 1026 * time.Second
case sdk.ChainIDHolesky:
return 975 * time.Second
default:
// The default value is the max finality time.
return 1066 * time.Second
}
}

View File

@ -0,0 +1,15 @@
package processor
import (
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"golang.org/x/net/context"
)
type Params struct {
TrackID string
VaaID string
ChainID sdk.ChainID
}
// ProcessorFunc is a function to process vaa message.
type ProcessorFunc func(context.Context, *Params) error

View File

@ -59,6 +59,8 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
q.logger.Debug("Received messages from SQS", zap.Int("count", len(messages)))
expiredAt := time.Now().Add(q.consumer.GetVisibilityTimeout())
for _, msg := range messages {
q.metrics.IncDuplicatedVaaConsumedQueue()
// unmarshal body to sqsEvent
var sqsEvent sqsEvent
err := json.Unmarshal([]byte(*msg.Body), &sqsEvent)
@ -88,7 +90,6 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
continue
}
// TODO add metrics to DuplicateVAA consume
retry, _ := strconv.Atoi(msg.Attributes["ApproximateReceiveCount"])
q.wg.Add(1)
q.ch <- &sqsConsumerMessage{
@ -140,12 +141,10 @@ func (m *sqsConsumerMessage) Done() {
zap.Error(err),
)
}
// TODO add metrics to DuplicateVAA consume
m.wg.Done()
}
func (m *sqsConsumerMessage) Failed() {
// TODO add metrics to duplicateVAA consume failed
m.wg.Done()
}

View File

@ -13,13 +13,15 @@ type sqsEvent struct {
// Event represents a event data to be handle.
type Event struct {
Type string `json:"type"`
Source string `json:"source"`
Data DuplicateVaa `json:"data"`
TrackID string `json:"trackId"`
Type string `json:"type"`
Source string `json:"source"`
Data DuplicateVaa `json:"data"`
}
type DuplicateVaa struct {
VaaID string `json:"vaaId"`
ChainID uint16 `json:"chainId"`
Version uint8 `json:"version"`
GuardianSetIndex uint32 `json:"guardianSetIndex"`
Vaa []byte `json:"vaas"`

View File

@ -0,0 +1,127 @@
package storage
import (
"context"
commonRepo "github.com/wormhole-foundation/wormhole-explorer/common/repository"
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/zap"
"gopkg.in/mgo.v2/bson"
)
// Repository exposes operations over the `globalTransactions` collection.
type Repository struct {
logger *zap.Logger
vaas *mongo.Collection
duplicateVaas *mongo.Collection
}
// New creates a new repository.
func NewRepository(logger *zap.Logger, db *mongo.Database) *Repository {
r := Repository{
logger: logger,
vaas: db.Collection(commonRepo.Vaas),
duplicateVaas: db.Collection(commonRepo.DuplicateVaas),
}
return &r
}
// FindVAAById find a vaa by id.
func (r *Repository) FindVAAById(ctx context.Context, vaaID string) (*VaaDoc, error) {
var vaaDoc VaaDoc
err := r.vaas.FindOne(ctx, bson.M{"_id": vaaID}).Decode(&vaaDoc)
return &vaaDoc, err
}
// FindDuplicateVAAById find a duplicate vaa by id.
func (r *Repository) FindDuplicateVAAById(ctx context.Context, id string) (*DuplicateVaaDoc, error) {
var duplicateVaaDoc DuplicateVaaDoc
err := r.duplicateVaas.FindOne(ctx, bson.M{"_id": id}).Decode(&duplicateVaaDoc)
return &duplicateVaaDoc, err
}
// FindDuplicateVAAs find duplicate vaas by vaa id.
func (r *Repository) FindDuplicateVAAs(ctx context.Context, vaaID string) ([]DuplicateVaaDoc, error) {
var duplicateVaaDocs []DuplicateVaaDoc
cursor, err := r.duplicateVaas.Find(ctx, bson.M{"vaaId": vaaID})
if err != nil {
return nil, err
}
if err = cursor.All(ctx, &duplicateVaaDocs); err != nil {
return nil, err
}
return duplicateVaaDocs, nil
}
// FixVAA fix a vaa by id.
func (r *Repository) FixVAA(ctx context.Context, vaaID, duplicateID string) error {
// start mongo transaction
session, err := r.vaas.Database().Client().StartSession()
if err != nil {
return err
}
err = session.StartTransaction()
if err != nil {
return err
}
// get VAA by id
vaaDoc, err := r.FindVAAById(ctx, vaaID)
if err != nil {
session.AbortTransaction(ctx)
return err
}
// get duplicate vaa by id
duplicateVaaDoc, err := r.FindDuplicateVAAById(ctx, duplicateID)
if err != nil {
session.AbortTransaction(ctx)
return err
}
// create new vaa and new duplicate vaa
newVaa := duplicateVaaDoc.ToVaaDoc(true)
newDuplicateVaa, err := vaaDoc.ToDuplicateVaaDoc()
if err != nil {
session.AbortTransaction(ctx)
return err
}
// remove vaa
_, err = r.vaas.DeleteOne(ctx, bson.M{"_id": vaaID})
if err != nil {
session.AbortTransaction(ctx)
return err
}
// remove duplicate vaa
_, err = r.duplicateVaas.DeleteOne(ctx, bson.M{"_id": duplicateID})
if err != nil {
session.AbortTransaction(ctx)
return err
}
// insert new vaa
_, err = r.vaas.InsertOne(ctx, newVaa)
if err != nil {
session.AbortTransaction(ctx)
return err
}
// insert new duplicate vaa
_, err = r.duplicateVaas.InsertOne(ctx, newDuplicateVaa)
if err != nil {
session.AbortTransaction(ctx)
return err
}
// commit transaction
err = session.CommitTransaction(ctx)
if err != nil {
session.AbortTransaction(ctx)
return err
}
return nil
}

View File

@ -0,0 +1,87 @@
package storage
import (
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
)
// VaaDoc represents a VAA document.
type VaaDoc struct {
ID string `bson:"_id"`
Version uint8 `bson:"version"`
EmitterChain sdk.ChainID `bson:"emitterChain"`
EmitterAddr string `bson:"emitterAddr"`
Sequence string `bson:"sequence"`
GuardianSetIndex uint32 `bson:"guardianSetIndex"`
Vaa []byte `bson:"vaas"`
TxHash string `bson:"txHash,omitempty"`
OriginTxHash *string `bson:"_originTxHash,omitempty"` //this is temporary field for fix enconding txHash
Timestamp *time.Time `bson:"timestamp"`
UpdatedAt *time.Time `bson:"updatedAt"`
Digest string `bson:"digest"`
IsDuplicated bool `bson:"isDuplicated"`
DuplicatedFixed bool `bson:"duplicatedFixed"`
}
// DuplicateVaaDoc represents a duplicate VAA document.
type DuplicateVaaDoc struct {
ID string `bson:"_id"`
VaaID string `bson:"vaaId"`
Version uint8 `bson:"version"`
EmitterChain sdk.ChainID `bson:"emitterChain"`
EmitterAddr string `bson:"emitterAddr"`
Sequence string `bson:"sequence"`
GuardianSetIndex uint32 `bson:"guardianSetIndex"`
Vaa []byte `bson:"vaas"`
Digest string `bson:"digest"`
ConsistencyLevel uint8 `bson:"consistencyLevel"`
TxHash string `bson:"txHash,omitempty"`
Timestamp *time.Time `bson:"timestamp"`
UpdatedAt *time.Time `bson:"updatedAt"`
}
func (d *DuplicateVaaDoc) ToVaaDoc(duplicatedFixed bool) *VaaDoc {
return &VaaDoc{
ID: d.VaaID,
Version: d.Version,
EmitterChain: d.EmitterChain,
EmitterAddr: d.EmitterAddr,
Sequence: d.Sequence,
GuardianSetIndex: d.GuardianSetIndex,
Vaa: d.Vaa,
Digest: d.Digest,
TxHash: d.TxHash,
OriginTxHash: nil,
Timestamp: d.Timestamp,
UpdatedAt: d.UpdatedAt,
DuplicatedFixed: duplicatedFixed,
IsDuplicated: true,
}
}
func (v *VaaDoc) ToDuplicateVaaDoc() (*DuplicateVaaDoc, error) {
vaa, err := vaa.Unmarshal(v.Vaa)
if err != nil {
return nil, err
}
uniqueId := domain.CreateUniqueVaaID(vaa)
return &DuplicateVaaDoc{
ID: uniqueId,
VaaID: v.ID,
Version: v.Version,
EmitterChain: v.EmitterChain,
EmitterAddr: v.EmitterAddr,
Sequence: v.Sequence,
GuardianSetIndex: v.GuardianSetIndex,
Vaa: v.Vaa,
Digest: v.Digest,
TxHash: v.TxHash,
ConsistencyLevel: vaa.ConsistencyLevel,
Timestamp: v.Timestamp,
UpdatedAt: v.UpdatedAt,
}, nil
}

View File

@ -9,6 +9,7 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/wormhole-foundation/wormhole-explorer/fly/event"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/fly/producer"
"github.com/wormhole-foundation/wormhole-explorer/fly/storage"
@ -41,6 +42,7 @@ func RunTxHashEncoding(cfg TxHashEncondingConfig) {
db.Database,
producer.NewVAAInMemory(logger).Push,
txhash.NewMongoTxHash(db.Database, logger),
event.NewNoopEventDispatcher(),
logger)
workerTxHashEncoding(ctx, logger, repository, vaa.ChainID(cfg.ChainID), cfg.PageSize)

View File

@ -7,6 +7,7 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
aws_sns "github.com/aws/aws-sdk-go-v2/service/sns"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/track"
)
type SnsEventDispatcher struct {
@ -23,9 +24,10 @@ func NewSnsEventDispatcher(awsConfig aws.Config, url string) (*SnsEventDispatche
func (s *SnsEventDispatcher) NewDuplicateVaa(ctx context.Context, e DuplicateVaa) error {
body, err := json.Marshal(event{
Type: "duplicated-vaa",
Source: "fly",
Data: e,
TrackID: track.GetTrackIDForDuplicatedVAA(e.VaaID),
Type: "duplicated-vaa",
Source: "fly",
Data: e,
})
if err != nil {
return err

View File

@ -17,9 +17,10 @@ type DuplicateVaa struct {
}
type event struct {
Type string `json:"type"`
Source string `json:"source"`
Data any `json:"data"`
TrackID string `json:"trackId"`
Type string `json:"type"`
Source string `json:"source"`
Data any `json:"data"`
}
type EventDispatcher interface {

View File

@ -11,3 +11,8 @@ func GetTrackID(vaaID string) string {
uuid := uuid.New()
return fmt.Sprintf("gossip-signed-vaa-%s-%s", vaaID, uuid.String())
}
func GetTrackIDForDuplicatedVAA(vaaID string) string {
uuid := uuid.New()
return fmt.Sprintf("fly-duplicated-vaa-%s-%s", vaaID, uuid.String())
}