dc4bc/storage/kafkaStorage_test.go

47 lines
854 B
Go
Raw Normal View History

2020-09-07 03:30:03 -07:00
package storage
import (
"context"
"testing"
"github.com/stretchr/testify/require"
)
// kafkacat -C -b localhost -t messages
2020-09-09 06:29:18 -07:00
func TestKafkaStorage_GetMessages(t *testing.T) {
2020-09-07 03:30:03 -07:00
if testing.Short() {
t.Skip("skipping long test")
}
2020-09-09 06:29:18 -07:00
N := 10
var offset uint64 = 5
2020-09-07 03:30:03 -07:00
req := require.New(t)
2020-10-05 10:07:02 -07:00
stg, err := NewKafkaStorage(context.Background(), "localhost:9092", "test_topic")
2020-09-07 03:30:03 -07:00
req.NoError(err)
2020-09-09 06:29:18 -07:00
msgs := make([]Message, 0, N)
for i := 0; i < N; i++ {
msg := Message{
Data: randomBytes(10),
Signature: randomBytes(10),
}
msg, err = stg.Send(msg)
if err != nil {
t.Error(err)
}
msgs = append(msgs, msg)
}
offsetMsgs, err := stg.GetMessages(offset)
if err != nil {
t.Error(err)
}
expectedOffsetMsgs := msgs[offset:]
for idx, msg := range expectedOffsetMsgs {
req.Equal(msg.Signature, offsetMsgs[idx].Signature)
}
2020-09-07 03:30:03 -07:00
}