storage SendBatch()

This commit is contained in:
programmer10110 2020-11-16 13:34:24 +03:00
parent 21847a3de0
commit db25008c58
7 changed files with 150 additions and 6 deletions

View File

@ -354,7 +354,7 @@ func (c *BaseClient) handleProcessedOperation(operation types.Operation) error {
return fmt.Errorf("processed operation does not match stored operation: %w", err)
}
for _, message := range operation.ResultMsgs {
for i, message := range operation.ResultMsgs {
message.SenderAddr = c.GetUsername()
sig, err := c.signMessage(message.Bytes())
@ -363,9 +363,11 @@ func (c *BaseClient) handleProcessedOperation(operation types.Operation) error {
}
message.Signature = sig
if _, err := c.storage.Send(message); err != nil {
return fmt.Errorf("failed to post message: %w", err)
}
operation.ResultMsgs[i] = message
}
if _, err := c.storage.SendBatch(operation.ResultMsgs...); err != nil {
return fmt.Errorf("failed to post messages: %w", err)
}
if err := c.state.DeleteOperation(operation.ID); err != nil {

View File

@ -48,6 +48,25 @@ func (mr *MockStorageMockRecorder) Send(message interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockStorage)(nil).Send), message)
}
// SendBatch mocks base method
func (m *MockStorage) SendBatch(messages ...storage.Message) ([]storage.Message, error) {
m.ctrl.T.Helper()
varargs := []interface{}{}
for _, a := range messages {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "SendBatch", varargs...)
ret0, _ := ret[0].([]storage.Message)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// SendBatch indicates an expected call of SendBatch
func (mr *MockStorageMockRecorder) SendBatch(messages ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendBatch", reflect.TypeOf((*MockStorage)(nil).SendBatch), messages...)
}
// GetMessages mocks base method
func (m *MockStorage) GetMessages(offset uint64) ([]storage.Message, error) {
m.ctrl.T.Helper()

View File

@ -81,6 +81,17 @@ func (fs *FileStorage) Send(m Message) (Message, error) {
return m, err
}
func (fs *FileStorage) SendBatch(msgs ...Message) ([]Message, error) {
var err error
for i, m := range msgs {
msgs[i], err = fs.Send(m)
if err != nil {
return msgs, err
}
}
return msgs, nil
}
// GetMessages returns a slice of messages from append-only data file with given offset
func (fs *FileStorage) GetMessages(offset uint64) ([]Message, error) {
var (

View File

@ -51,3 +51,39 @@ func TestFileStorage_GetMessages(t *testing.T) {
t.Errorf("expected messages: %v, actual messages: %v", expectedOffsetMsgs, offsetMsgs)
}
}
func TestFileStorage_SendBatch(t *testing.T) {
N := 10
var offset uint64 = 5
var testFile = "/tmp/dc4bc_test_file_storage"
fs, err := NewFileStorage(testFile)
if err != nil {
t.Error(err)
}
defer fs.Close()
defer os.Remove(testFile)
msgs := make([]Message, 0, N)
for i := 0; i < N; i++ {
msg := Message{
Data: randomBytes(10),
Signature: randomBytes(10),
}
msgs = append(msgs, msg)
}
sentMsgs, err := fs.SendBatch(msgs...)
if err != nil {
t.Error(err)
}
offsetMsgs, err := fs.GetMessages(offset)
if err != nil {
t.Error(err)
}
expectedOffsetMsgs := sentMsgs[offset:]
if !reflect.DeepEqual(offsetMsgs, expectedOffsetMsgs) {
t.Errorf("expected messages: %v, actual messages: %v", expectedOffsetMsgs, offsetMsgs)
}
}

View File

@ -62,6 +62,24 @@ func (s *KafkaStorage) Send(m Message) (Message, error) {
return m, err
}
func (s *KafkaStorage) SendBatch(msgs ...Message) ([]Message, error) {
err := try.Do(func(attempt int) (bool, error) {
var err error
msgs, err = s.sendBatch(msgs...)
if err != nil {
log.Printf("failed while trying to send message (%v), trying to reconnect", err)
if err := s.connect(); err != nil {
log.Printf("failed to reconnect (%v), %d retries left", err, try.MaxRetries-attempt)
}
}
time.Sleep(reconnectInterval)
return attempt < try.MaxRetries, err
})
return msgs, err
}
func (s *KafkaStorage) send(m Message) (Message, error) {
data, err := json.Marshal(m)
if err != nil {
@ -69,16 +87,37 @@ func (s *KafkaStorage) send(m Message) (Message, error) {
}
if err := s.writer.SetWriteDeadline(time.Now().Add(time.Second)); err != nil {
return Message{}, fmt.Errorf("failed to SetWriteDeadline: %w", err)
return m, fmt.Errorf("failed to SetWriteDeadline: %w", err)
}
if _, err := s.writer.WriteMessages(kafka.Message{Key: []byte(m.ID), Value: data}); err != nil {
return Message{}, fmt.Errorf("failed to WriteMessages: %w", err)
return m, fmt.Errorf("failed to WriteMessages: %w", err)
}
return m, nil
}
func (s *KafkaStorage) sendBatch(msgs ...Message) ([]Message, error) {
kafkaMessages := make([]kafka.Message, len(msgs))
for i, m := range msgs {
data, err := json.Marshal(m)
if err != nil {
return msgs, fmt.Errorf("failed to marshal a message %v: %v", m, err)
}
kafkaMessages[i] = kafka.Message{Key: []byte(m.ID), Value: data}
}
if err := s.writer.SetWriteDeadline(time.Now().Add(time.Second)); err != nil {
return msgs, fmt.Errorf("failed to SetWriteDeadline: %w", err)
}
if _, err := s.writer.WriteMessages(kafkaMessages...); err != nil {
return msgs, fmt.Errorf("failed to WriteMessages: %w", err)
}
return msgs, nil
}
func (s *KafkaStorage) GetMessages(offset uint64) (messages []Message, err error) {
err = try.Do(func(attempt int) (bool, error) {
var err error

View File

@ -44,3 +44,39 @@ func TestKafkaStorage_GetMessages(t *testing.T) {
req.Equal(msg.Signature, offsetMsgs[idx].Signature)
}
}
func TestKafkaStorage_SendBatch(t *testing.T) {
if testing.Short() {
t.Skip("skipping long test")
}
N := 10
var offset uint64 = 5
req := require.New(t)
stg, err := NewKafkaStorage(context.Background(), "localhost:9092", "test_topic")
req.NoError(err)
msgs := make([]Message, 0, N)
for i := 0; i < N; i++ {
msg := Message{
Data: randomBytes(10),
Signature: randomBytes(10),
}
msgs = append(msgs, msg)
}
sentMsgs, err := stg.SendBatch(msgs...)
req.NoError(err)
offsetMsgs, err := stg.GetMessages(offset)
if err != nil {
t.Error(err)
}
expectedOffsetMsgs := sentMsgs[offset:]
for idx, msg := range expectedOffsetMsgs {
req.Equal(msg.Signature, offsetMsgs[idx].Signature)
}
}

View File

@ -29,6 +29,7 @@ func (m *Message) Verify(pubKey ed25519.PublicKey) bool {
type Storage interface {
Send(message Message) (Message, error)
SendBatch(messages ...Message) ([]Message, error) //expected to be an atomic operation
GetMessages(offset uint64) ([]Message, error)
Close() error
}