dc4bc/storage/kafkaStorage.go

167 lines
3.7 KiB
Go

package storage
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/segmentio/kafka-go"
"gopkg.in/matryer/try.v1"
)
const (
kafkaPartition = 0
maxRetries = 30
reconnectInterval = time.Second
)
func init() {
try.MaxRetries = maxRetries
}
type KafkaStorage struct {
ctx context.Context
writer *kafka.Conn
reader *kafka.Reader
kafkaEndpoint string
kafkaTopic string
}
func NewKafkaStorage(ctx context.Context, kafkaEndpoint, kafkaTopic string) (Storage, error) {
stg := &KafkaStorage{
ctx: ctx,
kafkaEndpoint: kafkaEndpoint,
kafkaTopic: kafkaTopic,
}
if err := stg.connect(); err != nil {
return nil, fmt.Errorf("failed to connect: %w", err)
}
return stg, nil
}
func (s *KafkaStorage) Send(m Message) (Message, error) {
err := try.Do(func(attempt int) (bool, error) {
var err error
m, err = s.send(m)
if err != nil {
log.Printf("failed while trying to send message (%v), trying to reconnect", err)
if err := s.connect(); err != nil {
log.Printf("failed to reconnect (%v), %d retries left", err, try.MaxRetries-attempt)
}
}
time.Sleep(reconnectInterval)
return attempt < try.MaxRetries, err
})
return m, err
}
func (s *KafkaStorage) send(m Message) (Message, error) {
data, err := json.Marshal(m)
if err != nil {
return m, fmt.Errorf("failed to marshal a message %v: %v", m, err)
}
if err := s.writer.SetWriteDeadline(time.Now().Add(time.Second)); err != nil {
return Message{}, fmt.Errorf("failed to SetWriteDeadline: %w", err)
}
if _, err := s.writer.WriteMessages(kafka.Message{Key: []byte(m.ID), Value: data}); err != nil {
return Message{}, fmt.Errorf("failed to WriteMessages: %w", err)
}
return m, nil
}
func (s *KafkaStorage) GetMessages(offset uint64) (messages []Message, err error) {
err = try.Do(func(attempt int) (bool, error) {
var err error
messages, err = s.getMessages(offset)
if err != nil {
log.Printf("failed while trying to getMessages (%v), trying to reconnect", err)
if err := s.connect(); err != nil {
log.Printf("failed to reconnect (%v), %d retries left", err, try.MaxRetries-attempt)
}
}
time.Sleep(reconnectInterval)
return attempt < try.MaxRetries, err
})
return messages, err
}
func (s *KafkaStorage) getMessages(offset uint64) ([]Message, error) {
if err := s.reader.SetOffset(int64(offset)); err != nil {
return nil, fmt.Errorf("failed to SetOffset: %w", err)
}
lag, err := s.reader.ReadLag(context.Background())
if err != nil {
return nil, fmt.Errorf("failed to ReadLag: %w", err)
}
var (
message Message
messages []Message
i int64
)
for i = 0; i < lag; i++ {
kafkaMessage, err := s.reader.ReadMessage(context.Background())
if err != nil {
break
}
if err = json.Unmarshal(kafkaMessage.Value, &message); err != nil {
return nil, fmt.Errorf("failed to unmarshal a message %s: %v",
string(kafkaMessage.Value), err)
}
message.Offset = uint64(kafkaMessage.Offset)
messages = append(messages, message)
}
return messages, nil
}
func (s *KafkaStorage) Close() error {
if s.writer != nil {
if err := s.writer.Close(); err != nil {
return fmt.Errorf("failed to close writer: %w", err)
}
}
if s.reader != nil {
if err := s.reader.Close(); err != nil {
return fmt.Errorf("failed to close reader: %w", err)
}
}
return nil
}
func (s *KafkaStorage) connect() error {
_ = s.Close()
conn, err := kafka.DialLeader(s.ctx, "tcp", s.kafkaEndpoint, s.kafkaTopic, kafkaPartition)
if err != nil {
return fmt.Errorf("failed to init Kafka client: %w", err)
}
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{s.kafkaEndpoint},
Topic: s.kafkaTopic,
Partition: kafkaPartition,
MaxWait: time.Second,
})
s.writer, s.reader = conn, reader
return nil
}