This commit is contained in:
Agustin Pazos 2024-04-24 12:55:57 -03:00
parent b638aaafd7
commit 01afaab67b
7 changed files with 79 additions and 311 deletions

View File

@ -2,7 +2,6 @@ package consumer
import (
"context"
"fmt"
"github.com/wormhole-foundation/wormhole-explorer/common/pool"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/internal/metrics"
@ -61,8 +60,11 @@ func (c *Consumer) producerLoop(ctx context.Context, ch <-chan queue.ConsumerMes
case <-ctx.Done():
return
case msg := <-ch:
fmt.Print(msg.Data()) //TODO: remove this line
//TODO
c.processEvent(ctx, *msg.Data())
}
}
}
func (c *Consumer) processEvent(ctx context.Context, event queue.Event) {
//TODO
}

View File

@ -1 +0,0 @@
package consumer

View File

@ -2,6 +2,8 @@ package queue
import (
"context"
"encoding/json"
"strconv"
"sync"
"time"
@ -55,52 +57,52 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
continue
}
q.logger.Debug("Received messages from SQS", zap.Int("count", len(messages)))
// TODO: change this
//expiredAt := time.Now().Add(q.consumer.GetVisibilityTimeout())
//for _, msg := range messages {
// unmarshal body to sqsEvent
// var sqsEvent sqsEvent
// err := json.Unmarshal([]byte(*msg.Body), &sqsEvent)
// if err != nil {
// q.logger.Error("Error decoding message from SQS", zap.Error(err))
// if err = q.consumer.DeleteMessage(ctx, msg.ReceiptHandle); err != nil {
// q.logger.Error("Error deleting message from SQS", zap.Error(err))
// }
// continue
// }
expiredAt := time.Now().Add(q.consumer.GetVisibilityTimeout())
for _, msg := range messages {
// unmarshal body to sqsEvent
var sqsEvent sqsEvent
err := json.Unmarshal([]byte(*msg.Body), &sqsEvent)
if err != nil {
q.logger.Error("Error decoding message from SQS", zap.Error(err))
if err = q.consumer.DeleteMessage(ctx, msg.ReceiptHandle); err != nil {
q.logger.Error("Error deleting message from SQS", zap.Error(err))
}
continue
}
// // unmarshal message to event
// event, err := q.converter(sqsEvent.Message)
// if err != nil {
// q.logger.Error("Error converting event message", zap.Error(err))
// if err = q.consumer.DeleteMessage(ctx, msg.ReceiptHandle); err != nil {
// q.logger.Error("Error deleting message from SQS", zap.Error(err))
// }
// continue
// }
// if event == nil {
// q.logger.Warn("Can not handle message", zap.String("body", *msg.Body))
// if err = q.consumer.DeleteMessage(ctx, msg.ReceiptHandle); err != nil {
// q.logger.Error("Error deleting message from SQS", zap.Error(err))
// }
// continue
// }
// q.metrics.IncVaaConsumedQueue(event.ChainID.String(), event.Source)
var event *Event
err = json.Unmarshal([]byte(sqsEvent.Message), event)
if err != nil {
q.logger.Error("Error decoding message from SQS", zap.Error(err))
if err = q.consumer.DeleteMessage(ctx, msg.ReceiptHandle); err != nil {
q.logger.Error("Error deleting message from SQS", zap.Error(err))
}
continue
}
// retry, _ := strconv.Atoi(msg.Attributes["ApproximateReceiveCount"])
// q.wg.Add(1)
// q.ch <- &sqsConsumerMessage{
// id: msg.ReceiptHandle,
// data: event,
// wg: &q.wg,
// logger: q.logger,
// consumer: q.consumer,
// expiredAt: expiredAt,
// retry: uint8(retry),
// metrics: q.metrics,
// ctx: ctx,
// }
//}
if event == nil {
q.logger.Warn("Can not handle message", zap.String("body", *msg.Body))
if err = q.consumer.DeleteMessage(ctx, msg.ReceiptHandle); err != nil {
q.logger.Error("Error deleting message from SQS", zap.Error(err))
}
continue
}
// TODO add metrics to DuplicateVAA consume
retry, _ := strconv.Atoi(msg.Attributes["ApproximateReceiveCount"])
q.wg.Add(1)
q.ch <- &sqsConsumerMessage{
id: msg.ReceiptHandle,
data: event,
wg: &q.wg,
logger: q.logger,
consumer: q.consumer,
expiredAt: expiredAt,
retry: uint8(retry),
metrics: q.metrics,
ctx: ctx,
}
}
q.wg.Wait()
}
@ -132,18 +134,18 @@ func (m *sqsConsumerMessage) Data() *Event {
func (m *sqsConsumerMessage) Done() {
if err := m.consumer.DeleteMessage(m.ctx, m.id); err != nil {
m.logger.Error("Error deleting message from SQS",
//zap.String("vaaId", m.data.ID),
zap.String("vaaId", m.data.Data.VaaID),
zap.Bool("isExpired", m.IsExpired()),
zap.Time("expiredAt", m.expiredAt),
zap.Error(err),
)
}
//m.metrics.IncVaaProcessed(uint16(m.data.ChainID), m.retry)
// TODO add metrics to DuplicateVAA consume
m.wg.Done()
}
func (m *sqsConsumerMessage) Failed() {
//m.metrics.IncVaaFailed(uint16(m.data.ChainID), m.retry)
// TODO add metrics to duplicateVAA consume failed
m.wg.Done()
}

View File

@ -1,9 +1,31 @@
package queue
import "context"
import (
"context"
"time"
)
// sqsEvent represents a event data from SQS.
type sqsEvent struct {
MessageID string `json:"MessageId"`
Message string `json:"Message"`
}
// Event represents a event data to be handle.
type Event struct {
Type string `json:"type"`
Source string `json:"source"`
Data DuplicateVaa `json:"data"`
}
type DuplicateVaa struct {
VaaID string `json:"vaaId"`
Version uint8 `json:"version"`
GuardianSetIndex uint32 `json:"guardianSetIndex"`
Vaa []byte `json:"vaas"`
Digest string `json:"digest"`
ConsistencyLevel uint8 `json:"consistencyLevel"`
Timestamp *time.Time `json:"timestamp"`
}
// ConsumerMessage defition.

View File

@ -7,6 +7,7 @@ import (
type DuplicateVaa struct {
VaaID string `json:"vaaId"`
ChainID uint16 `json:"chainId"`
Version uint8 `json:"version"`
GuardianSetIndex uint32 `json:"guardianSetIndex"`
Vaa []byte `json:"vaas"`

View File

@ -523,6 +523,7 @@ func (s *Repository) UpsertDuplicateVaa(ctx context.Context, v *vaa.VAA, seriali
}
return s.eventDispatcher.NewDuplicateVaa(ctx, event.DuplicateVaa{
VaaID: v.MessageID(),
ChainID: uint16(v.EmitterChain),
Version: v.Version,
GuardianSetIndex: v.GuardianSetIndex,
Vaa: serializedVaa,

View File

@ -159,270 +159,11 @@ func (r *Repository) AlreadyProcessed(ctx context.Context, vaaId string) (bool,
}
}
// CountDocumentsByTimeRange returns the number of documents that match the given time range.
func (r *Repository) CountDocumentsByTimeRange(
ctx context.Context,
timeAfter time.Time,
timeBefore time.Time,
) (uint64, error) {
// Build the aggregation pipeline
var pipeline mongo.Pipeline
{
// filter by time range
pipeline = append(pipeline, bson.D{
{"$match", bson.D{
{"timestamp", bson.D{{"$gte", timeAfter}}},
}},
})
pipeline = append(pipeline, bson.D{
{"$match", bson.D{
{"timestamp", bson.D{{"$lte", timeBefore}}},
}},
})
// Count the number of results
pipeline = append(pipeline, bson.D{
{"$count", "numDocuments"},
})
}
// Execute the aggregation pipeline
cur, err := r.vaas.Aggregate(ctx, pipeline)
if err != nil {
r.logger.Error("failed execute aggregation pipeline", zap.Error(err))
return 0, err
}
// Read results from cursor
var results []struct {
NumDocuments uint64 `bson:"numDocuments"`
}
err = cur.All(ctx, &results)
if err != nil {
r.logger.Error("failed to decode cursor", zap.Error(err))
return 0, err
}
if len(results) == 0 {
return 0, nil
}
if len(results) > 1 {
r.logger.Error("too many results", zap.Int("numResults", len(results)))
return 0, err
}
return results[0].NumDocuments, nil
}
// CountIncompleteDocuments returns the number of documents that have destTx data, but don't have sourceTx data.
func (r *Repository) CountIncompleteDocuments(ctx context.Context) (uint64, error) {
// Build the aggregation pipeline
var pipeline mongo.Pipeline
{
// Look up transactions that either:
// 1. have not been processed
// 2. have been processed, but encountered an internal error
pipeline = append(pipeline, bson.D{
{"$match", bson.D{
{"$or", bson.A{
bson.D{{"originTx", bson.D{{"$exists", false}}}},
bson.D{{"originTx.status", bson.M{"$eq": domain.SourceTxStatusInternalError}}},
}},
}},
})
// Count the number of results
pipeline = append(pipeline, bson.D{
{"$count", "numDocuments"},
})
}
// Execute the aggregation pipeline
cur, err := r.globalTransactions.Aggregate(ctx, pipeline)
if err != nil {
r.logger.Error("failed execute aggregation pipeline", zap.Error(err))
return 0, err
}
// Read results from cursor
var results []struct {
NumDocuments uint64 `bson:"numDocuments"`
}
err = cur.All(ctx, &results)
if err != nil {
r.logger.Error("failed to decode cursor", zap.Error(err))
return 0, err
}
if len(results) == 0 {
return 0, nil
}
if len(results) > 1 {
r.logger.Error("too many results", zap.Int("numResults", len(results)))
return 0, err
}
return results[0].NumDocuments, nil
}
type GlobalTransaction struct {
Id string `bson:"_id"`
Vaas []vaa.VaaDoc `bson:"vaas"`
}
// GetDocumentsByTimeRange iterates through documents within a specified time range.
func (r *Repository) GetDocumentsByTimeRange(
ctx context.Context,
lastId string,
lastTimestamp *time.Time,
limit uint,
timeAfter time.Time,
timeBefore time.Time,
) ([]GlobalTransaction, error) {
// Build the aggregation pipeline
var pipeline mongo.Pipeline
{
// Specify sorting criteria
pipeline = append(pipeline, bson.D{
{"$sort", bson.D{
bson.E{"timestamp", -1},
bson.E{"_id", 1},
}},
})
// filter out already processed documents
//
// We use the timestap field as a pagination cursor
if lastTimestamp != nil {
pipeline = append(pipeline, bson.D{
{"$match", bson.D{
{"$or", bson.A{
bson.D{{"timestamp", bson.M{"$lt": *lastTimestamp}}},
bson.D{{"$and", bson.A{
bson.D{{"timestamp", bson.M{"$eq": *lastTimestamp}}},
bson.D{{"_id", bson.M{"$gt": lastId}}},
}}},
}},
}},
})
}
// filter by time range
pipeline = append(pipeline, bson.D{
{"$match", bson.D{
{"timestamp", bson.D{{"$gte", timeAfter}}},
}},
})
pipeline = append(pipeline, bson.D{
{"$match", bson.D{
{"timestamp", bson.D{{"$lte", timeBefore}}},
}},
})
// Limit size of results
pipeline = append(pipeline, bson.D{
{"$limit", limit},
})
}
// Execute the aggregation pipeline
cur, err := r.vaas.Aggregate(ctx, pipeline)
if err != nil {
r.logger.Error("failed execute aggregation pipeline", zap.Error(err))
return nil, errors.WithStack(err)
}
// Read results from cursor
var documents []vaa.VaaDoc
err = cur.All(ctx, &documents)
if err != nil {
r.logger.Error("failed to decode cursor", zap.Error(err))
return nil, errors.WithStack(err)
}
// Build the result
var globalTransactions []GlobalTransaction
for i := range documents {
globalTransaction := GlobalTransaction{
Id: documents[i].ID,
Vaas: []vaa.VaaDoc{documents[i]},
}
globalTransactions = append(globalTransactions, globalTransaction)
}
return globalTransactions, nil
}
// GetIncompleteDocuments gets a batch of VAA IDs from the database.
func (r *Repository) GetIncompleteDocuments(
ctx context.Context,
lastId string,
lastTimestamp *time.Time,
limit uint,
) ([]GlobalTransaction, error) {
// Build the aggregation pipeline
var pipeline mongo.Pipeline
{
// Specify sorting criteria
pipeline = append(pipeline, bson.D{
{"$sort", bson.D{bson.E{"_id", 1}}},
})
// filter out already processed documents
//
// We use the _id field as a pagination cursor
pipeline = append(pipeline, bson.D{
{"$match", bson.D{{"_id", bson.M{"$gt": lastId}}}},
})
// Look up transactions that either:
// 1. have not been processed
// 2. have been processed, but encountered an internal error
pipeline = append(pipeline, bson.D{
{"$match", bson.D{
{"$or", bson.A{
bson.D{{"originTx", bson.D{{"$exists", false}}}},
bson.D{{"originTx.status", bson.M{"$eq": domain.SourceTxStatusInternalError}}},
}},
}},
})
// Left join on the VAA collection
pipeline = append(pipeline, bson.D{
{"$lookup", bson.D{
{"from", "vaas"},
{"localField", "_id"},
{"foreignField", "_id"},
{"as", "vaas"},
}},
})
// Limit size of results
pipeline = append(pipeline, bson.D{
{"$limit", limit},
})
}
// Execute the aggregation pipeline
cur, err := r.globalTransactions.Aggregate(ctx, pipeline)
if err != nil {
r.logger.Error("failed execute aggregation pipeline", zap.Error(err))
return nil, errors.WithStack(err)
}
// Read results from cursor
var documents []GlobalTransaction
err = cur.All(ctx, &documents)
if err != nil {
r.logger.Error("failed to decode cursor", zap.Error(err))
return nil, errors.WithStack(err)
}
return documents, nil
}
// VaaIdTxHash represents a vaaIdTxHash document.
type VaaIdTxHash struct {
TxHash string `bson:"txHash"`