wormhole-explorer/pipeline/topic/vaa_sns.go

57 lines
1.5 KiB
Go

package topic
import (
"context"
"encoding/json"
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
pipelineAlert "github.com/wormhole-foundation/wormhole-explorer/pipeline/internal/alert"
"github.com/wormhole-foundation/wormhole-explorer/pipeline/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/pipeline/internal/sns"
"go.uber.org/zap"
)
// SQS represents a VAA queue in SNS.
type SNS struct {
producer *sns.Producer
alertClient alert.AlertClient
metrics metrics.Metrics
logger *zap.Logger
}
// NewVAASNS creates a VAA topic in SNS instances.
func NewVAASNS(producer *sns.Producer, alertClient alert.AlertClient, metrics metrics.Metrics, logger *zap.Logger) *SNS {
s := &SNS{
producer: producer,
alertClient: alertClient,
metrics: metrics,
logger: logger,
}
return s
}
// Publish sends the message to a SNS topic.
func (s *SNS) Publish(ctx context.Context, message *Event) error {
body, err := json.Marshal(message)
if err != nil {
return err
}
s.logger.Debug("Publishing message", zap.String("groupID", message.ID))
err = s.producer.SendMessage(ctx, message.ChainID, message.ID, message.ID, string(body))
if err == nil {
s.metrics.IncVaaSendNotification(message.ChainID)
} else {
// Alert error pushing event.
alertContext := alert.AlertContext{
Details: map[string]string{
"groupID": message.ID,
"messageID": message.ID,
},
Error: err,
}
s.alertClient.CreateAndSend(ctx, pipelineAlert.ErrorPushEventSNS, alertContext)
}
return err
}