From 9873e2aadf7eae2cdfac85d475f6a4db7e27a026 Mon Sep 17 00:00:00 2001 From: Fernando Torres Date: Wed, 24 Apr 2024 16:51:20 -0300 Subject: [PATCH] wip --- pipeline/pipeline/publisher.go | 3 ++- pipeline/topic/topic.go | 3 ++- pipeline/watcher/watcher.go | 3 ++- tx-tracker/consumer/consumer.go | 2 +- tx-tracker/queue/converter.go | 2 ++ tx-tracker/queue/types.go | 1 + 6 files changed, 10 insertions(+), 4 deletions(-) diff --git a/pipeline/pipeline/publisher.go b/pipeline/pipeline/publisher.go index f53ecb20..a2a64051 100644 --- a/pipeline/pipeline/publisher.go +++ b/pipeline/pipeline/publisher.go @@ -49,7 +49,8 @@ func (p *Publisher) Publish(ctx context.Context, e *watcher.Event) { TxHash: e.TxHash, Version: e.Version, Revision: e.Revision, - Hash: e.Hash, + Digest: e.Digest, + Overwrite: e.DuplicatedFixed, } // In some scenarios the fly component that inserts the VAA documents does not have the txhash field available, diff --git a/pipeline/topic/topic.go b/pipeline/topic/topic.go index 2beb856c..65312ff6 100644 --- a/pipeline/topic/topic.go +++ b/pipeline/topic/topic.go @@ -19,7 +19,8 @@ type Event struct { TxHash string `json:"txHash"` Version uint16 `json:"version"` Revision uint16 `json:"revision"` - Hash []byte `json:"hash"` + Digest string `json:"digest"` + Overwrite bool `json:"overwrite"` } // PushFunc is a function to push VAAEvent. diff --git a/pipeline/watcher/watcher.go b/pipeline/watcher/watcher.go index 01c76306..f07d1bc3 100644 --- a/pipeline/watcher/watcher.go +++ b/pipeline/watcher/watcher.go @@ -50,8 +50,9 @@ type Event struct { TxHash string `bson:"txHash"` Version uint16 `bson:"version"` Revision uint16 `bson:"revision"` - Hash []byte `bson:"hash"` + Digest string `bson:"digest"` IsDuplicated bool `bson:"isDuplicated"` + DuplicatedFixed bool `bson:"duplicatedFixed"` } const queryTemplate = ` diff --git a/tx-tracker/consumer/consumer.go b/tx-tracker/consumer/consumer.go index bc7fb0e9..b516b16f 100644 --- a/tx-tracker/consumer/consumer.go +++ b/tx-tracker/consumer/consumer.go @@ -114,7 +114,7 @@ func (c *Consumer) processSourceTx(ctx context.Context, msg queue.ConsumerMessag Vaa: event.Vaa, IsVaaSigned: event.IsVaaSigned, Metrics: c.metrics, - Overwrite: false, // avoid processing the same transaction twice + Overwrite: event.Overwrite, // avoid processing the same transaction twice Source: event.Source, } _, err := ProcessSourceTx(ctx, c.logger, c.rpcpool, c.wormchainRpcPool, c.repository, &p, c.p2pNetwork) diff --git a/tx-tracker/queue/converter.go b/tx-tracker/queue/converter.go index 60a37e82..474be06e 100644 --- a/tx-tracker/queue/converter.go +++ b/tx-tracker/queue/converter.go @@ -25,6 +25,7 @@ type VaaEvent struct { TxHash string `json:"txHash"` Version uint16 `json:"version"` Revision uint16 `json:"revision"` + Overwrite bool `json:"overwrite"` } // VaaConverter converts a message from a VAAEvent. @@ -49,6 +50,7 @@ func NewVaaConverter(log *zap.Logger) ConverterFunc { Vaa: vaaEvent.Vaa, IsVaaSigned: true, TxHash: vaaEvent.TxHash, + Overwrite: vaaEvent.Overwrite, }, nil } } diff --git a/tx-tracker/queue/types.go b/tx-tracker/queue/types.go index 5a6a2b6c..51ae96c6 100644 --- a/tx-tracker/queue/types.go +++ b/tx-tracker/queue/types.go @@ -51,6 +51,7 @@ type Event struct { Vaa []byte IsVaaSigned bool Attributes any + Overwrite bool } func GetAttributes[T EventAttributes](e *Event) (T, bool) {