Fix: parser memory leaks in sqs consumer (#122)
Co-authored-by: walker-16 <agpazos85@gmail.com>
This commit is contained in:
parent
a6cb972eb1
commit
821071086d
|
@ -83,10 +83,10 @@ func (c *Consumer) GetMessages(ctx context.Context) ([]aws_sqs_types.Message, er
|
|||
}
|
||||
|
||||
// DeleteMessage deletes messages from SQS.
|
||||
func (c *Consumer) DeleteMessage(ctx context.Context, msg *aws_sqs_types.Message) error {
|
||||
func (c *Consumer) DeleteMessage(ctx context.Context, id *string) error {
|
||||
params := &aws_sqs.DeleteMessageInput{
|
||||
QueueUrl: aws.String(c.url),
|
||||
ReceiptHandle: msg.ReceiptHandle,
|
||||
ReceiptHandle: id,
|
||||
}
|
||||
_, err := c.api.DeleteMessage(ctx, params)
|
||||
|
||||
|
|
|
@ -26,57 +26,55 @@ func NewConsumer(consume queue.VAAConsumeFunc, repository *parser.Repository, pa
|
|||
// Start consumes messages from VAA queue, parse and store those messages in a repository.
|
||||
func (c *Consumer) Start(ctx context.Context) {
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case msg := <-c.consume(ctx):
|
||||
event := msg.Data
|
||||
for msg := range c.consume(ctx) {
|
||||
event := msg.Data()
|
||||
|
||||
// check id message is expired.
|
||||
if msg.IsExpired() {
|
||||
c.logger.Warn("Message with vaa expired", zap.String("id", event.ID()))
|
||||
continue
|
||||
}
|
||||
|
||||
// call vaa-payload-parser api to parse a VAA.
|
||||
sequence := strconv.FormatUint(event.Sequence, 10)
|
||||
vaaParseResponse, err := c.parser.Parse(event.ChainID, event.EmitterAddress, sequence, event.Vaa)
|
||||
if err != nil {
|
||||
if errors.Is(err, parser.ErrInternalError) {
|
||||
c.logger.Info("error parsing VAA, will retry later", zap.Uint16("chainID", event.ChainID),
|
||||
zap.String("address", event.EmitterAddress), zap.Uint64("sequence", event.Sequence), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
c.logger.Info("VAA cannot be parsed", zap.Uint16("chainID", event.ChainID),
|
||||
zap.String("address", event.EmitterAddress), zap.Uint64("sequence", event.Sequence), zap.Error(err))
|
||||
msg.Ack()
|
||||
continue
|
||||
}
|
||||
|
||||
// create ParsedVaaUpdate to upsert.
|
||||
now := time.Now()
|
||||
vaaParsed := parser.ParsedVaaUpdate{
|
||||
ID: event.ID(),
|
||||
EmitterChain: event.ChainID,
|
||||
EmitterAddr: event.EmitterAddress,
|
||||
Sequence: strconv.FormatUint(event.Sequence, 10),
|
||||
AppID: vaaParseResponse.AppID,
|
||||
Result: vaaParseResponse.Result,
|
||||
UpdatedAt: &now,
|
||||
}
|
||||
|
||||
err = c.repository.UpsertParsedVaa(ctx, vaaParsed)
|
||||
if err != nil {
|
||||
c.logger.Error("Error inserting vaa in repository",
|
||||
zap.String("id", event.ID()),
|
||||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
msg.Ack()
|
||||
c.logger.Info("Vaa save in repository", zap.String("id", event.ID()))
|
||||
// check id message is expired.
|
||||
if msg.IsExpired() {
|
||||
c.logger.Warn("Message with vaa expired", zap.String("id", event.ID()))
|
||||
msg.Failed()
|
||||
continue
|
||||
}
|
||||
|
||||
// call vaa-payload-parser api to parse a VAA.
|
||||
sequence := strconv.FormatUint(event.Sequence, 10)
|
||||
vaaParseResponse, err := c.parser.Parse(event.ChainID, event.EmitterAddress, sequence, event.Vaa)
|
||||
if err != nil {
|
||||
if errors.Is(err, parser.ErrInternalError) {
|
||||
c.logger.Info("error parsing VAA, will retry later", zap.Uint16("chainID", event.ChainID),
|
||||
zap.String("address", event.EmitterAddress), zap.Uint64("sequence", event.Sequence), zap.Error(err))
|
||||
msg.Failed()
|
||||
continue
|
||||
}
|
||||
|
||||
c.logger.Info("VAA cannot be parsed", zap.Uint16("chainID", event.ChainID),
|
||||
zap.String("address", event.EmitterAddress), zap.Uint64("sequence", event.Sequence), zap.Error(err))
|
||||
msg.Done()
|
||||
continue
|
||||
}
|
||||
|
||||
// create ParsedVaaUpdate to upsert.
|
||||
now := time.Now()
|
||||
vaaParsed := parser.ParsedVaaUpdate{
|
||||
ID: event.ID(),
|
||||
EmitterChain: event.ChainID,
|
||||
EmitterAddr: event.EmitterAddress,
|
||||
Sequence: strconv.FormatUint(event.Sequence, 10),
|
||||
AppID: vaaParseResponse.AppID,
|
||||
Result: vaaParseResponse.Result,
|
||||
UpdatedAt: &now,
|
||||
}
|
||||
|
||||
err = c.repository.UpsertParsedVaa(ctx, vaaParsed)
|
||||
if err != nil {
|
||||
c.logger.Error("Error inserting vaa in repository",
|
||||
zap.String("id", event.ID()),
|
||||
zap.Error(err))
|
||||
msg.Failed()
|
||||
continue
|
||||
}
|
||||
msg.Done()
|
||||
c.logger.Info("Vaa save in repository", zap.String("id", event.ID()))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
|
|
@ -14,10 +14,11 @@ type VaaEvent struct {
|
|||
}
|
||||
|
||||
// ConsumerMessage defition.
|
||||
type ConsumerMessage struct {
|
||||
Data *VaaEvent
|
||||
Ack func()
|
||||
IsExpired func() bool
|
||||
type ConsumerMessage interface {
|
||||
Data() *VaaEvent
|
||||
Done()
|
||||
Failed()
|
||||
IsExpired() bool
|
||||
}
|
||||
|
||||
// ID get vaa ID (chainID/emiiterAddress/sequence)
|
||||
|
@ -29,4 +30,4 @@ func (v *VaaEvent) ID() string {
|
|||
type VAAPushFunc func(context.Context, *VaaEvent) error
|
||||
|
||||
// VAAConsumeFunc is a function to consume VAAEvent.
|
||||
type VAAConsumeFunc func(context.Context) <-chan *ConsumerMessage
|
||||
type VAAConsumeFunc func(context.Context) <-chan ConsumerMessage
|
||||
|
|
|
@ -9,7 +9,7 @@ type VAAInMemoryOption func(*VAAInMemory)
|
|||
|
||||
// VAAInMemory represents VAA queue in memory.
|
||||
type VAAInMemory struct {
|
||||
ch chan *ConsumerMessage
|
||||
ch chan ConsumerMessage
|
||||
size int
|
||||
}
|
||||
|
||||
|
@ -19,7 +19,7 @@ func NewVAAInMemory(opts ...VAAInMemoryOption) *VAAInMemory {
|
|||
for _, opt := range opts {
|
||||
opt(m)
|
||||
}
|
||||
m.ch = make(chan *ConsumerMessage, m.size)
|
||||
m.ch = make(chan ConsumerMessage, m.size)
|
||||
return m
|
||||
}
|
||||
|
||||
|
@ -32,15 +32,29 @@ func WithSize(v int) VAAInMemoryOption {
|
|||
|
||||
// Publish sends the message to a channel.
|
||||
func (i *VAAInMemory) Publish(_ context.Context, message *VaaEvent) error {
|
||||
i.ch <- &ConsumerMessage{
|
||||
Data: message,
|
||||
Ack: func() {},
|
||||
IsExpired: func() bool { return false },
|
||||
i.ch <- &memoryConsumerMessage{
|
||||
data: message,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Consume returns the channel with the received messages.
|
||||
func (i *VAAInMemory) Consume(_ context.Context) <-chan *ConsumerMessage {
|
||||
func (i *VAAInMemory) Consume(_ context.Context) <-chan ConsumerMessage {
|
||||
return i.ch
|
||||
}
|
||||
|
||||
type memoryConsumerMessage struct {
|
||||
data *VaaEvent
|
||||
}
|
||||
|
||||
func (m *memoryConsumerMessage) Data() *VaaEvent {
|
||||
return m.data
|
||||
}
|
||||
|
||||
func (m *memoryConsumerMessage) Done() {}
|
||||
|
||||
func (m *memoryConsumerMessage) Failed() {}
|
||||
|
||||
func (m *memoryConsumerMessage) IsExpired() bool {
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/wormhole-foundation/wormhole-explorer/parser/internal/sqs"
|
||||
|
@ -17,8 +18,9 @@ type SQSOption func(*SQS)
|
|||
type SQS struct {
|
||||
producer *sqs.Producer
|
||||
consumer *sqs.Consumer
|
||||
ch chan *ConsumerMessage
|
||||
ch chan ConsumerMessage
|
||||
chSize int
|
||||
wg sync.WaitGroup
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
|
@ -32,7 +34,7 @@ func NewVAASQS(producer *sqs.Producer, consumer *sqs.Consumer, logger *zap.Logge
|
|||
for _, opt := range opts {
|
||||
opt(s)
|
||||
}
|
||||
s.ch = make(chan *ConsumerMessage, s.chSize)
|
||||
s.ch = make(chan ConsumerMessage, s.chSize)
|
||||
return s
|
||||
}
|
||||
|
||||
|
@ -55,40 +57,36 @@ func (q *SQS) Publish(ctx context.Context, message *VaaEvent) error {
|
|||
}
|
||||
|
||||
// Consume returns the channel with the received messages from SQS queue.
|
||||
func (q *SQS) Consume(ctx context.Context) <-chan *ConsumerMessage {
|
||||
func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
messages, err := q.consumer.GetMessages(ctx)
|
||||
messages, err := q.consumer.GetMessages(ctx)
|
||||
if err != nil {
|
||||
q.logger.Error("Error getting messages from SQS", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
expiredAt := time.Now().Add(q.consumer.GetVisibilityTimeout())
|
||||
for _, msg := range messages {
|
||||
var body VaaEvent
|
||||
err := json.Unmarshal([]byte(*msg.Body), &body)
|
||||
if err != nil {
|
||||
q.logger.Error("Error getting messages from SQS", zap.Error(err))
|
||||
q.logger.Error("Error decoding message from SQS", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
expiredAt := time.Now().Add(q.consumer.GetVisibilityTimeout())
|
||||
for _, msg := range messages {
|
||||
var body VaaEvent
|
||||
err := json.Unmarshal([]byte(*msg.Body), &body)
|
||||
if err != nil {
|
||||
q.logger.Error("Error decoding message from SQS", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
q.ch <- &ConsumerMessage{
|
||||
Data: &body,
|
||||
Ack: func() {
|
||||
if err := q.consumer.DeleteMessage(ctx, &msg); err != nil {
|
||||
q.logger.Error("Error deleting message from SQS", zap.Error(err))
|
||||
}
|
||||
},
|
||||
IsExpired: func() bool {
|
||||
return expiredAt.Before(time.Now())
|
||||
},
|
||||
}
|
||||
q.wg.Add(1)
|
||||
q.ch <- &sqsConsumerMessage{
|
||||
id: msg.ReceiptHandle,
|
||||
data: &body,
|
||||
wg: &q.wg,
|
||||
logger: q.logger,
|
||||
consumer: q.consumer,
|
||||
expiredAt: expiredAt,
|
||||
ctx: ctx,
|
||||
}
|
||||
}
|
||||
q.wg.Wait()
|
||||
}
|
||||
|
||||
}()
|
||||
return q.ch
|
||||
}
|
||||
|
@ -97,3 +95,32 @@ func (q *SQS) Consume(ctx context.Context) <-chan *ConsumerMessage {
|
|||
func (q *SQS) Close() {
|
||||
close(q.ch)
|
||||
}
|
||||
|
||||
type sqsConsumerMessage struct {
|
||||
data *VaaEvent
|
||||
consumer *sqs.Consumer
|
||||
wg *sync.WaitGroup
|
||||
id *string
|
||||
logger *zap.Logger
|
||||
expiredAt time.Time
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func (m *sqsConsumerMessage) Data() *VaaEvent {
|
||||
return m.data
|
||||
}
|
||||
|
||||
func (m *sqsConsumerMessage) Done() {
|
||||
if err := m.consumer.DeleteMessage(m.ctx, m.id); err != nil {
|
||||
m.logger.Error("Error deleting message from SQS", zap.Error(err))
|
||||
}
|
||||
m.wg.Done()
|
||||
}
|
||||
|
||||
func (m *sqsConsumerMessage) Failed() {
|
||||
m.wg.Done()
|
||||
}
|
||||
|
||||
func (m *sqsConsumerMessage) IsExpired() bool {
|
||||
return m.expiredAt.Before(time.Now())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue