Handle unproccessable response from vaa-payload-parser (#1250)

Add retry in analytics metrics

Co-authored-by: walker-16 <agpazos85@gmail.com>
This commit is contained in:
ftocal 2024-03-26 11:53:11 -03:00 committed by GitHub
parent adb7231074
commit f4390b8436
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 47 additions and 31 deletions

View File

@ -81,6 +81,11 @@ func (r *TokenResolver) GetTransferredTokenByVaa(ctx context.Context, vaa *sdk.V
// Parse the VAA with standarized properties
result, err := r.client.ParseVaaWithStandarizedProperties(vaa)
if err != nil {
if errors.Is(err, parser.ErrUnprocessableEntity) {
r.logger.Debug("Parsing vaa with standarized properties resulted in unprocessable entity",
zap.String("vaaId", vaa.MessageID()))
return nil, nil
}
r.logger.Error("Parsing vaa with standarized properties",
zap.String("vaaId", vaa.MessageID()),
zap.Error(err))

View File

@ -36,7 +36,7 @@ func (c *Consumer) Start(ctx context.Context) {
if msg.IsExpired() {
msg.Failed()
c.logger.Warn("Message with vaa expired", zap.String("id", event.ID))
c.metrics.IncExpiredMessage(chainID, event.Source)
c.metrics.IncExpiredMessage(chainID, event.Source, msg.Retry())
continue
}
@ -45,7 +45,7 @@ func (c *Consumer) Start(ctx context.Context) {
if err != nil {
msg.Done()
c.logger.Error("Invalid vaa", zap.String("id", event.ID), zap.Error(err))
c.metrics.IncInvalidMessage(chainID, event.Source)
c.metrics.IncInvalidMessage(chainID, event.Source, msg.Retry())
continue
}
@ -53,13 +53,13 @@ func (c *Consumer) Start(ctx context.Context) {
err = c.pushMetric(ctx, &metric.Params{TrackID: event.TrackID, Vaa: vaa, VaaIsSigned: event.VaaIsSigned})
if err != nil {
msg.Failed()
c.metrics.IncUnprocessedMessage(chainID, event.Source)
c.metrics.IncUnprocessedMessage(chainID, event.Source, msg.Retry())
continue
}
msg.Done()
c.logger.Debug("Pushed vaa metric", zap.String("id", event.ID))
c.metrics.IncProcessedMessage(chainID, event.Source)
c.metrics.IncProcessedMessage(chainID, event.Source, msg.Retry())
}
}()
}

View File

@ -9,8 +9,8 @@ type Metrics interface {
IncFoundNotional(symbol string)
IncMissingToken(chain, token string)
IncFoundToken(chain, token string)
IncExpiredMessage(chain, source string)
IncInvalidMessage(chain, source string)
IncUnprocessedMessage(chain, source string)
IncProcessedMessage(chain, source string)
IncExpiredMessage(chain, source string, retry uint8)
IncInvalidMessage(chain, source string, retry uint8)
IncUnprocessedMessage(chain, source string, retry uint8)
IncProcessedMessage(chain, source string, retry uint8)
}

View File

@ -27,14 +27,14 @@ func (p *NoopMetrics) IncMissingToken(chain, token string) {
func (p *NoopMetrics) IncFoundToken(chain, token string) {
}
func (p *NoopMetrics) IncExpiredMessage(chain, source string) {
func (p *NoopMetrics) IncExpiredMessage(chain, source string, retry uint8) {
}
func (p *NoopMetrics) IncInvalidMessage(chain, source string) {
func (p *NoopMetrics) IncInvalidMessage(chain, source string, retry uint8) {
}
func (p *NoopMetrics) IncUnprocessedMessage(chain, source string) {
func (p *NoopMetrics) IncUnprocessedMessage(chain, source string, retry uint8) {
}
func (p *NoopMetrics) IncProcessedMessage(chain, source string) {
func (p *NoopMetrics) IncProcessedMessage(chain, source string, retry uint8) {
}

View File

@ -1,6 +1,8 @@
package metrics
import (
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
@ -51,7 +53,7 @@ func NewPrometheusMetrics(environment string) *PrometheusMetrics {
Help: "Total number of processed message",
ConstLabels: constLabels,
},
[]string{"chain", "source", "status"},
[]string{"chain", "source", "status", "retry"},
)
return &PrometheusMetrics{
measurementCount: measurementCount,
@ -85,18 +87,18 @@ func (p *PrometheusMetrics) IncFoundToken(chain, token string) {
p.tokenRequestsCount.WithLabelValues(chain, token, "found").Inc()
}
func (p *PrometheusMetrics) IncExpiredMessage(chain, source string) {
p.processedMessage.WithLabelValues(chain, source, "expired").Inc()
func (p *PrometheusMetrics) IncExpiredMessage(chain, source string, retry uint8) {
p.processedMessage.WithLabelValues(chain, source, "expired", fmt.Sprintf("%d", retry)).Inc()
}
func (p *PrometheusMetrics) IncInvalidMessage(chain, source string) {
p.processedMessage.WithLabelValues(chain, source, "invalid").Inc()
func (p *PrometheusMetrics) IncInvalidMessage(chain, source string, retry uint8) {
p.processedMessage.WithLabelValues(chain, source, "invalid", fmt.Sprintf("%d", retry)).Inc()
}
func (p *PrometheusMetrics) IncUnprocessedMessage(chain, source string) {
p.processedMessage.WithLabelValues(chain, source, "unprocessed").Inc()
func (p *PrometheusMetrics) IncUnprocessedMessage(chain, source string, retry uint8) {
p.processedMessage.WithLabelValues(chain, source, "unprocessed", fmt.Sprintf("%d", retry)).Inc()
}
func (p *PrometheusMetrics) IncProcessedMessage(chain, source string) {
p.processedMessage.WithLabelValues(chain, source, "processed").Inc()
func (p *PrometheusMetrics) IncProcessedMessage(chain, source string, retry uint8) {
p.processedMessage.WithLabelValues(chain, source, "processed", fmt.Sprintf("%d", retry)).Inc()
}

View File

@ -69,7 +69,7 @@ func NewNotificationEvent(log *zap.Logger) ConverterFunc {
case events.SignedVaaType:
signedVaa, err := events.GetEventData[events.SignedVaa](&notification)
if err != nil {
log.Error("Error decoding signedVAA from notification event", zap.String("trackId", notification.TrackID), zap.Error(err))
log.Debug("Error decoding signedVAA from notification event", zap.String("trackId", notification.TrackID), zap.Error(err))
return nil, nil
}
@ -92,7 +92,7 @@ func NewNotificationEvent(log *zap.Logger) ConverterFunc {
vaa, err := events.CreateUnsignedVAA(&plm)
if err != nil {
log.Error("Error creating unsigned vaa", zap.String("trackId", notification.TrackID), zap.Error(err))
log.Debug("Error creating unsigned vaa", zap.String("trackId", notification.TrackID), zap.Error(err))
return nil, err
}

View File

@ -3,6 +3,7 @@ package queue
import (
"context"
"encoding/json"
"strconv"
"sync"
"time"
@ -65,7 +66,7 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
var sqsEvent sqsEvent
err := json.Unmarshal([]byte(*msg.Body), &sqsEvent)
if err != nil {
q.logger.Error("Error decoding message from SQS", zap.Error(err))
q.logger.Error("Error decoding message from SQS", zap.Error(err), zap.String("body", *msg.Body))
if err = q.consumer.DeleteMessage(ctx, msg.ReceiptHandle); err != nil {
q.logger.Error("Error deleting message from SQS", zap.Error(err))
}
@ -75,7 +76,7 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
// converts message to event
event, err := q.converter(sqsEvent.Message)
if err != nil {
q.logger.Error("Error converting event message", zap.Error(err))
q.logger.Error("Error converting event message", zap.Error(err), zap.String("body", *msg.Body))
if err = q.consumer.DeleteMessage(ctx, msg.ReceiptHandle); err != nil {
q.logger.Error("Error deleting message from SQS", zap.Error(err))
}
@ -90,6 +91,7 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
continue
}
retry, _ := strconv.Atoi(msg.Attributes["ApproximateReceiveCount"])
q.wg.Add(1)
q.ch <- &sqsConsumerMessage{
id: msg.ReceiptHandle,
@ -97,6 +99,7 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
wg: &q.wg,
logger: q.logger,
consumer: q.consumer,
retry: uint8(retry),
expiredAt: expiredAt,
ctx: ctx,
}
@ -119,6 +122,7 @@ type sqsConsumerMessage struct {
wg *sync.WaitGroup
id *string
logger *zap.Logger
retry uint8
expiredAt time.Time
ctx context.Context
}
@ -141,3 +145,7 @@ func (m *sqsConsumerMessage) Failed() {
func (m *sqsConsumerMessage) IsExpired() bool {
return m.expiredAt.Before(time.Now())
}
func (m *sqsConsumerMessage) Retry() uint8 {
return m.retry
}

View File

@ -25,6 +25,7 @@ type Event struct {
// ConsumerMessage defition.
type ConsumerMessage interface {
Retry() uint8
Data() *Event
Done()
Failed()

View File

@ -19,7 +19,7 @@ var (
ErrCallEndpoint = errors.New("ERROR CALL ENPOINT")
ErrNotFound = errors.New("NOT FOUND")
ErrInternalError = errors.New("INTERNAL ERROR")
ErrUnproceesableEntity = errors.New("UNPROCESSABLE")
ErrUnprocessableEntity = errors.New("UNPROCESSABLE")
ErrBadRequest = errors.New("BAD REQUEST")
)
@ -102,7 +102,7 @@ func (c ParserVAAAPIClient) ParsePayload(chainID uint16, address, sequence strin
case http.StatusBadRequest:
return nil, ErrBadRequest
case http.StatusUnprocessableEntity:
return nil, ErrUnproceesableEntity
return nil, ErrUnprocessableEntity
default:
return nil, ErrInternalError
}
@ -157,7 +157,7 @@ func (c *ParserVAAAPIClient) ParseVaaWithStandarizedProperties(vaa *sdk.VAA) (*P
case http.StatusBadRequest:
return nil, ErrBadRequest
case http.StatusUnprocessableEntity:
return nil, ErrUnproceesableEntity
return nil, ErrUnprocessableEntity
default:
return nil, ErrInternalError
}
@ -191,7 +191,7 @@ func (c *ParserVAAAPIClient) ParseVaa(vaa *sdk.VAA) (any, error) {
case http.StatusBadRequest:
return nil, ErrBadRequest
case http.StatusUnprocessableEntity:
return nil, ErrUnproceesableEntity
return nil, ErrUnprocessableEntity
default:
return nil, ErrInternalError
}

View File

@ -109,8 +109,8 @@ func TestUnprocessableVaaParser(t *testing.T) {
if err == nil {
t.Error("expected error, got nil")
}
if !errors.Is(err, ErrUnproceesableEntity) {
t.Errorf("expected ErrUnproceesableEntity, got %s", err.Error())
if !errors.Is(err, ErrUnprocessableEntity) {
t.Errorf("expected ErrUnprocessableEntity, got %s", err.Error())
}
if parserVaaResponse != nil {
t.Error("expected parserVaaResponse zero value, got %w", parserVaaResponse)