92 lines
2.6 KiB
Go
92 lines
2.6 KiB
Go
package queue
|
|
|
|
import (
|
|
"context"
|
|
"encoding/base64"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
|
|
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/sqs"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// ObservationSqs represents a observation queue in SQS.
|
|
type ObservationSqs struct {
|
|
producer *sqs.Producer
|
|
consumer *sqs.Consumer
|
|
ch chan Message[*gossipv1.SignedObservation]
|
|
chSize int
|
|
wg sync.WaitGroup
|
|
logger *zap.Logger
|
|
}
|
|
|
|
// NewObservationSqs creates a observation queue in SQS instances.
|
|
func NewObservationSqs(producer *sqs.Producer, consumer *sqs.Consumer, logger *zap.Logger, opts ...VAASqsOption) *ObservationSqs {
|
|
s := &ObservationSqs{
|
|
producer: producer,
|
|
consumer: consumer,
|
|
chSize: 10,
|
|
logger: logger.With(zap.String("queueUrl", consumer.GetQueueUrl()))}
|
|
s.ch = make(chan Message[*gossipv1.SignedObservation], s.chSize)
|
|
return s
|
|
}
|
|
|
|
// Publish sends the message to a SQS queue.
|
|
func (q *ObservationSqs) Publish(ctx context.Context, o *gossipv1.SignedObservation) error {
|
|
dto := toObservation(o)
|
|
body, err := json.Marshal(dto)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
id := fmt.Sprintf("%s/%s/%s", o.MessageId, hex.EncodeToString(o.Addr), hex.EncodeToString(o.Hash))
|
|
deduplicationId := base64.StdEncoding.EncodeToString([]byte(id))[:127]
|
|
return q.producer.SendMessage(ctx, deduplicationId, deduplicationId, string(body))
|
|
}
|
|
|
|
// Consume returns the channel with the received messages from SQS queue.
|
|
func (q *ObservationSqs) Consume(ctx context.Context) <-chan Message[*gossipv1.SignedObservation] {
|
|
go func() {
|
|
for {
|
|
messages, err := q.consumer.GetMessages(ctx)
|
|
if err != nil {
|
|
q.logger.Error("Error getting messages from SQS", zap.Error(err))
|
|
continue
|
|
}
|
|
q.logger.Info("Received messages from SQS", zap.Int("count", len(messages)))
|
|
expiredAt := time.Now().Add(q.consumer.GetVisibilityTimeout())
|
|
for _, msg := range messages {
|
|
var obs Observation
|
|
err := json.Unmarshal([]byte(*msg.Body), &obs)
|
|
if err != nil {
|
|
q.logger.Error("Error decoding message from SQS", zap.Error(err))
|
|
continue
|
|
}
|
|
q.logger.Info("Observation message received", zap.String("id", obs.MessageID))
|
|
|
|
//TODO check if callback is better than channel
|
|
q.wg.Add(1)
|
|
q.ch <- &sqsConsumerMessage[*gossipv1.SignedObservation]{
|
|
id: msg.ReceiptHandle,
|
|
data: fromObservation(&obs),
|
|
wg: &q.wg,
|
|
logger: q.logger,
|
|
consumer: q.consumer,
|
|
expiredAt: expiredAt,
|
|
ctx: ctx,
|
|
}
|
|
}
|
|
q.wg.Wait()
|
|
}
|
|
}()
|
|
return q.ch
|
|
}
|
|
|
|
// Close closes all consumer resources.
|
|
func (q *ObservationSqs) Close() {
|
|
close(q.ch)
|
|
}
|