wormhole-explorer/fly/queue/observation_sqs.go

92 lines
2.6 KiB
Go

package queue
import (
"context"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"sync"
"time"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/sqs"
"go.uber.org/zap"
)
// ObservationSqs represents a observation queue in SQS.
type ObservationSqs struct {
producer *sqs.Producer
consumer *sqs.Consumer
ch chan Message[*gossipv1.SignedObservation]
chSize int
wg sync.WaitGroup
logger *zap.Logger
}
// NewObservationSqs creates a observation queue in SQS instances.
func NewObservationSqs(producer *sqs.Producer, consumer *sqs.Consumer, logger *zap.Logger, opts ...VAASqsOption) *ObservationSqs {
s := &ObservationSqs{
producer: producer,
consumer: consumer,
chSize: 10,
logger: logger.With(zap.String("queueUrl", consumer.GetQueueUrl()))}
s.ch = make(chan Message[*gossipv1.SignedObservation], s.chSize)
return s
}
// Publish sends the message to a SQS queue.
func (q *ObservationSqs) Publish(ctx context.Context, o *gossipv1.SignedObservation) error {
dto := toObservation(o)
body, err := json.Marshal(dto)
if err != nil {
return err
}
id := fmt.Sprintf("%s/%s/%s", o.MessageId, hex.EncodeToString(o.Addr), hex.EncodeToString(o.Hash))
deduplicationId := base64.StdEncoding.EncodeToString([]byte(id))[:127]
return q.producer.SendMessage(ctx, deduplicationId, deduplicationId, string(body))
}
// Consume returns the channel with the received messages from SQS queue.
func (q *ObservationSqs) Consume(ctx context.Context) <-chan Message[*gossipv1.SignedObservation] {
go func() {
for {
messages, err := q.consumer.GetMessages(ctx)
if err != nil {
q.logger.Error("Error getting messages from SQS", zap.Error(err))
continue
}
q.logger.Info("Received messages from SQS", zap.Int("count", len(messages)))
expiredAt := time.Now().Add(q.consumer.GetVisibilityTimeout())
for _, msg := range messages {
var obs Observation
err := json.Unmarshal([]byte(*msg.Body), &obs)
if err != nil {
q.logger.Error("Error decoding message from SQS", zap.Error(err))
continue
}
q.logger.Info("Observation message received", zap.String("id", obs.MessageID))
//TODO check if callback is better than channel
q.wg.Add(1)
q.ch <- &sqsConsumerMessage[*gossipv1.SignedObservation]{
id: msg.ReceiptHandle,
data: fromObservation(&obs),
wg: &q.wg,
logger: q.logger,
consumer: q.consumer,
expiredAt: expiredAt,
ctx: ctx,
}
}
q.wg.Wait()
}
}()
return q.ch
}
// Close closes all consumer resources.
func (q *ObservationSqs) Close() {
close(q.ch)
}