Implement batch operations

This commit is contained in:
Christopher Goes 2018-05-08 16:38:39 +02:00
parent 45514a6013
commit 0b6d101c77
No known key found for this signature in database
GPG Key ID: E828D98232D328D3
5 changed files with 294 additions and 43 deletions

View File

@ -169,3 +169,29 @@ func (s *server) Stats(context.Context, *protodb.Nothing) (*protodb.Stats, error
stats := s.db.Stats()
return &protodb.Stats{Data: stats, TimeAt: time.Now().Unix()}, nil
func (s *server) BatchWrite(c context.Context, b *protodb.Batch) (*protodb.Nothing, error) {
return s.batchWrite(c, b, false)
func (s *server) BatchWriteSync(c context.Context, b *protodb.Batch) (*protodb.Nothing, error) {
return s.batchWrite(c, b, true)
func (s *server) batchWrite(c context.Context, b *protodb.Batch, sync bool) (*protodb.Nothing, error) {
bat := s.db.NewBatch()
for _, op := range b.Ops {
switch op.Type {
case protodb.Operation_SET:
bat.Set(op.Entity.Key, op.Entity.Value)
case protodb.Operation_DELETE:
if sync {
} else {
return nothing, nil

View File

@ -8,6 +8,8 @@ It is generated from these files:
It has these top-level messages:
@ -37,6 +39,67 @@ var _ = math.Inf
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Operation_Type int32
const (
Operation_SET Operation_Type = 0
Operation_DELETE Operation_Type = 1
var Operation_Type_name = map[int32]string{
0: "SET",
1: "DELETE",
var Operation_Type_value = map[string]int32{
"SET": 0,
"DELETE": 1,
func (x Operation_Type) String() string {
return proto.EnumName(Operation_Type_name, int32(x))
func (Operation_Type) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{1, 0} }
type Batch struct {
Ops []*Operation `protobuf:"bytes,1,rep,name=ops" json:"ops,omitempty"`
func (m *Batch) Reset() { *m = Batch{} }
func (m *Batch) String() string { return proto.CompactTextString(m) }
func (*Batch) ProtoMessage() {}
func (*Batch) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (m *Batch) GetOps() []*Operation {
if m != nil {
return m.Ops
return nil
type Operation struct {
Entity *Entity `protobuf:"bytes,1,opt,name=entity" json:"entity,omitempty"`
Type Operation_Type `protobuf:"varint,2,opt,name=type,enum=protodb.Operation_Type" json:"type,omitempty"`
func (m *Operation) Reset() { *m = Operation{} }
func (m *Operation) String() string { return proto.CompactTextString(m) }
func (*Operation) ProtoMessage() {}
func (*Operation) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *Operation) GetEntity() *Entity {
if m != nil {
return m.Entity
return nil
func (m *Operation) GetType() Operation_Type {
if m != nil {
return m.Type
return Operation_SET
type Entity struct {
Id int32 `protobuf:"varint,1,opt,name=id" json:"id,omitempty"`
Key []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"`
@ -51,7 +114,7 @@ type Entity struct {
func (m *Entity) Reset() { *m = Entity{} }
func (m *Entity) String() string { return proto.CompactTextString(m) }
func (*Entity) ProtoMessage() {}
func (*Entity) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (*Entity) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
func (m *Entity) GetId() int32 {
if m != nil {
@ -115,7 +178,7 @@ type Nothing struct {
func (m *Nothing) Reset() { *m = Nothing{} }
func (m *Nothing) String() string { return proto.CompactTextString(m) }
func (*Nothing) ProtoMessage() {}
func (*Nothing) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (*Nothing) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
type Domain struct {
Start []byte `protobuf:"bytes,1,opt,name=start,proto3" json:"start,omitempty"`
@ -125,7 +188,7 @@ type Domain struct {
func (m *Domain) Reset() { *m = Domain{} }
func (m *Domain) String() string { return proto.CompactTextString(m) }
func (*Domain) ProtoMessage() {}
func (*Domain) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
func (*Domain) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} }
func (m *Domain) GetStart() []byte {
if m != nil {
@ -151,7 +214,7 @@ type Iterator struct {
func (m *Iterator) Reset() { *m = Iterator{} }
func (m *Iterator) String() string { return proto.CompactTextString(m) }
func (*Iterator) ProtoMessage() {}
func (*Iterator) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
func (*Iterator) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} }
func (m *Iterator) GetDomain() *Domain {
if m != nil {
@ -189,7 +252,7 @@ type Stats struct {
func (m *Stats) Reset() { *m = Stats{} }
func (m *Stats) String() string { return proto.CompactTextString(m) }
func (*Stats) ProtoMessage() {}
func (*Stats) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} }
func (*Stats) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} }
func (m *Stats) GetData() map[string]string {
if m != nil {
@ -214,7 +277,7 @@ type Init struct {
func (m *Init) Reset() { *m = Init{} }
func (m *Init) String() string { return proto.CompactTextString(m) }
func (*Init) ProtoMessage() {}
func (*Init) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} }
func (*Init) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} }
func (m *Init) GetType() string {
if m != nil {
@ -238,12 +301,15 @@ func (m *Init) GetDir() string {
func init() {
proto.RegisterType((*Batch)(nil), "protodb.Batch")
proto.RegisterType((*Operation)(nil), "protodb.Operation")
proto.RegisterType((*Entity)(nil), "protodb.Entity")
proto.RegisterType((*Nothing)(nil), "protodb.Nothing")
proto.RegisterType((*Domain)(nil), "protodb.Domain")
proto.RegisterType((*Iterator)(nil), "protodb.Iterator")
proto.RegisterType((*Stats)(nil), "protodb.Stats")
proto.RegisterType((*Init)(nil), "protodb.Init")
proto.RegisterEnum("protodb.Operation_Type", Operation_Type_name, Operation_Type_value)
// Reference imports to suppress errors if they are not otherwise used.
@ -269,6 +335,8 @@ type DBClient interface {
ReverseIterator(ctx context.Context, in *Entity, opts ...grpc.CallOption) (DB_ReverseIteratorClient, error)
// rpc print(Nothing) returns (Entity) {}
Stats(ctx context.Context, in *Nothing, opts ...grpc.CallOption) (*Stats, error)
BatchWrite(ctx context.Context, in *Batch, opts ...grpc.CallOption) (*Nothing, error)
BatchWriteSync(ctx context.Context, in *Batch, opts ...grpc.CallOption) (*Nothing, error)
type dBClient struct {
@ -446,6 +514,24 @@ func (c *dBClient) Stats(ctx context.Context, in *Nothing, opts ...grpc.CallOpti
return out, nil
func (c *dBClient) BatchWrite(ctx context.Context, in *Batch, opts ...grpc.CallOption) (*Nothing, error) {
out := new(Nothing)
err := grpc.Invoke(ctx, "/protodb.DB/batchWrite", in, out,, opts...)
if err != nil {
return nil, err
return out, nil
func (c *dBClient) BatchWriteSync(ctx context.Context, in *Batch, opts ...grpc.CallOption) (*Nothing, error) {
out := new(Nothing)
err := grpc.Invoke(ctx, "/protodb.DB/batchWriteSync", in, out,, opts...)
if err != nil {
return nil, err
return out, nil
// Server API for DB service
type DBServer interface {
@ -461,6 +547,8 @@ type DBServer interface {
ReverseIterator(*Entity, DB_ReverseIteratorServer) error
// rpc print(Nothing) returns (Entity) {}
Stats(context.Context, *Nothing) (*Stats, error)
BatchWrite(context.Context, *Batch) (*Nothing, error)
BatchWriteSync(context.Context, *Batch) (*Nothing, error)
func RegisterDBServer(s *grpc.Server, srv DBServer) {
@ -679,6 +767,42 @@ func _DB_Stats_Handler(srv interface{}, ctx context.Context, dec func(interface{
return interceptor(ctx, in, info, handler)
func _DB_BatchWrite_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Batch)
if err := dec(in); err != nil {
return nil, err
if interceptor == nil {
return srv.(DBServer).BatchWrite(ctx, in)
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/protodb.DB/BatchWrite",
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DBServer).BatchWrite(ctx, req.(*Batch))
return interceptor(ctx, in, info, handler)
func _DB_BatchWriteSync_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Batch)
if err := dec(in); err != nil {
return nil, err
if interceptor == nil {
return srv.(DBServer).BatchWriteSync(ctx, in)
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/protodb.DB/BatchWriteSync",
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DBServer).BatchWriteSync(ctx, req.(*Batch))
return interceptor(ctx, in, info, handler)
var _DB_serviceDesc = grpc.ServiceDesc{
ServiceName: "protodb.DB",
HandlerType: (*DBServer)(nil),
@ -715,6 +839,14 @@ var _DB_serviceDesc = grpc.ServiceDesc{
MethodName: "stats",
Handler: _DB_Stats_Handler,
MethodName: "batchWrite",
Handler: _DB_BatchWrite_Handler,
MethodName: "batchWriteSync",
Handler: _DB_BatchWriteSync_Handler,
Streams: []grpc.StreamDesc{
@ -740,37 +872,43 @@ var _DB_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("defs.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 498 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x53, 0xcd, 0x6e, 0xd3, 0x40,
0x10, 0xce, 0xda, 0x8e, 0x13, 0x4f, 0xa1, 0x2d, 0x2b, 0x04, 0xab, 0x4a, 0x48, 0x96, 0x2f, 0x98,
0x3f, 0x2b, 0xa4, 0x07, 0x7e, 0x4e, 0x14, 0xa5, 0x87, 0x5c, 0x7a, 0x70, 0xb8, 0xa3, 0x6d, 0x3d,
0xa4, 0x2b, 0x1a, 0xbb, 0xec, 0x0e, 0x15, 0x7e, 0x02, 0x1e, 0x80, 0x27, 0xe1, 0x0d, 0xd1, 0xae,
0x7f, 0x42, 0x69, 0x0e, 0xe6, 0xe4, 0x99, 0xd9, 0xef, 0xfb, 0x66, 0xf6, 0xf3, 0x2c, 0x40, 0x81,
0x5f, 0x4c, 0x76, 0xad, 0x2b, 0xaa, 0xf8, 0xc4, 0x7d, 0x8a, 0xf3, 0xe4, 0x37, 0x83, 0xf0, 0xb4,
0x24, 0x45, 0x35, 0xdf, 0x07, 0x4f, 0x15, 0x82, 0xc5, 0x2c, 0x1d, 0xe7, 0x9e, 0x2a, 0xf8, 0x21,
0xf8, 0x5f, 0xb1, 0x16, 0x5e, 0xcc, 0xd2, 0x7b, 0xb9, 0x0d, 0xf9, 0x43, 0x18, 0xdf, 0xc8, 0xab,
0xef, 0x28, 0x7c, 0x57, 0x6b, 0x12, 0xfe, 0x08, 0x42, 0xfc, 0xa1, 0x0c, 0x19, 0x11, 0xc4, 0x2c,
0x9d, 0xe6, 0x6d, 0x66, 0xd1, 0x86, 0xa4, 0x26, 0x31, 0x6e, 0xd0, 0x2e, 0xb1, 0xaa, 0x58, 0x16,
0x22, 0x6c, 0x54, 0xb1, 0x74, 0x7d, 0x50, 0x6b, 0x31, 0x89, 0x59, 0x1a, 0xe5, 0x36, 0xe4, 0x4f,
0x00, 0x2e, 0x34, 0x4a, 0xc2, 0xe2, 0xb3, 0x24, 0x31, 0x8d, 0x59, 0xea, 0xe7, 0x51, 0x5b, 0x39,
0xa1, 0x24, 0x82, 0xc9, 0x59, 0x45, 0x97, 0xaa, 0x5c, 0x27, 0x33, 0x08, 0x17, 0xd5, 0x46, 0xaa,
0x72, 0xdb, 0x8d, 0xed, 0xe8, 0xe6, 0xf5, 0xdd, 0x92, 0x6f, 0x30, 0x5d, 0x12, 0x6a, 0x49, 0x95,
0xe6, 0x4f, 0x21, 0x2c, 0x1c, 0xdb, 0x91, 0xf6, 0xe6, 0x07, 0x59, 0x6b, 0x4b, 0xd6, 0x88, 0xe6,
0xed, 0x71, 0x7b, 0x71, 0xd5, 0x08, 0x4d, 0xf3, 0x26, 0xe9, 0x0c, 0xf2, 0x77, 0x18, 0x14, 0xfc,
0x65, 0x50, 0xf2, 0x93, 0xc1, 0x78, 0x45, 0x92, 0x0c, 0x7f, 0x09, 0x41, 0x21, 0x49, 0x0a, 0x16,
0xfb, 0xe9, 0xde, 0x5c, 0xf4, 0xed, 0xdc, 0x69, 0xb6, 0x90, 0x24, 0x4f, 0x4b, 0xd2, 0x75, 0xee,
0x50, 0xfc, 0x31, 0x4c, 0x48, 0x6d, 0xd0, 0x7a, 0xe0, 0x39, 0x0f, 0x42, 0x9b, 0x9e, 0xd0, 0xd1,
0x1b, 0x88, 0x7a, 0x6c, 0x37, 0x05, 0x6b, 0xec, 0xbb, 0x35, 0x85, 0xe7, 0x6a, 0x4d, 0xf2, 0xde,
0x7b, 0xcb, 0x92, 0x0f, 0x10, 0x2c, 0x4b, 0x45, 0x9c, 0x43, 0xf0, 0xa9, 0xbe, 0xc6, 0x96, 0xe4,
0x62, 0x5b, 0x3b, 0x93, 0x9b, 0x8e, 0xe4, 0x62, 0xab, 0xbd, 0x50, 0xda, 0xdd, 0x30, 0xca, 0x6d,
0x38, 0xff, 0x15, 0x80, 0xb7, 0xf8, 0xc8, 0x53, 0x08, 0x94, 0x15, 0xba, 0xdf, 0x5f, 0xc1, 0xea,
0x1e, 0x6d, 0x0d, 0x6c, 0x76, 0x2a, 0x19, 0xf1, 0x67, 0xe0, 0xaf, 0x91, 0xf8, 0xbf, 0x27, 0xbb,
0xa0, 0xc7, 0x10, 0xad, 0x91, 0x56, 0xa4, 0x51, 0x6e, 0x86, 0x10, 0x52, 0x36, 0x63, 0x56, 0xff,
0x52, 0x9a, 0x41, 0xfa, 0xcf, 0xc1, 0x37, 0xbb, 0x46, 0x39, 0xec, 0x0b, 0xdd, 0x5a, 0x8d, 0x78,
0x06, 0x13, 0x83, 0xb4, 0xaa, 0xcb, 0x8b, 0x61, 0xf8, 0x57, 0x10, 0x16, 0x78, 0x85, 0x84, 0xc3,
0xe0, 0xaf, 0xed, 0x6b, 0xb4, 0xf0, 0xe1, 0x1d, 0xe6, 0x30, 0x55, 0xdd, 0xe2, 0xde, 0x21, 0x3c,
0xd8, 0xfe, 0x87, 0x16, 0x93, 0x8c, 0x66, 0x8c, 0xbf, 0x83, 0x03, 0x8d, 0x37, 0xa8, 0x0d, 0x2e,
0xff, 0x97, 0xfa, 0xc2, 0xbd, 0x27, 0x32, 0xfc, 0xce, 0x2c, 0x47, 0xfb, 0xb7, 0xf7, 0x36, 0x19,
0x9d, 0x87, 0xae, 0x70, 0xfc, 0x27, 0x00, 0x00, 0xff, 0xff, 0x4d, 0xfe, 0x6a, 0xcc, 0x63, 0x04,
0x00, 0x00,
// 606 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0x4f, 0x6f, 0xd3, 0x4e,
0x10, 0xcd, 0xda, 0x8e, 0x13, 0x4f, 0x7f, 0xbf, 0x34, 0x8c, 0x10, 0xb5, 0x8a, 0x90, 0x22, 0x0b,
0x09, 0x43, 0x69, 0x14, 0x52, 0x24, 0xfe, 0x9c, 0x68, 0x95, 0x1c, 0x2a, 0xa1, 0x22, 0x39, 0x95,
0x38, 0xa2, 0x6d, 0x3d, 0x34, 0x2b, 0x1a, 0x3b, 0xac, 0x87, 0x8a, 0x5c, 0xb8, 0xf2, 0x79, 0xf8,
0x7c, 0x5c, 0xd0, 0xae, 0x1d, 0x87, 0x36, 0x39, 0x84, 0x53, 0x76, 0x66, 0xde, 0x7b, 0xb3, 0xf3,
0x32, 0x5e, 0x80, 0x94, 0x3e, 0x17, 0xfd, 0xb9, 0xce, 0x39, 0xc7, 0x96, 0xfd, 0x49, 0x2f, 0xa2,
0x43, 0x68, 0x9e, 0x48, 0xbe, 0x9c, 0xe2, 0x63, 0x70, 0xf3, 0x79, 0x11, 0x8a, 0x9e, 0x1b, 0xef,
0x0c, 0xb1, 0x5f, 0xd5, 0xfb, 0x1f, 0xe6, 0xa4, 0x25, 0xab, 0x3c, 0x4b, 0x4c, 0x39, 0xfa, 0x01,
0x41, 0x9d, 0xc1, 0x27, 0xe0, 0x53, 0xc6, 0x8a, 0x17, 0xa1, 0xe8, 0x89, 0x78, 0x67, 0xb8, 0x5b,
0xb3, 0xc6, 0x36, 0x9d, 0x54, 0x65, 0x3c, 0x00, 0x8f, 0x17, 0x73, 0x0a, 0x9d, 0x9e, 0x88, 0x3b,
0xc3, 0xbd, 0x75, 0xf1, 0xfe, 0xf9, 0x62, 0x4e, 0x89, 0x05, 0x45, 0x0f, 0xc1, 0x33, 0x11, 0xb6,
0xc0, 0x9d, 0x8c, 0xcf, 0xbb, 0x0d, 0x04, 0xf0, 0x47, 0xe3, 0xf7, 0xe3, 0xf3, 0x71, 0x57, 0x44,
0xbf, 0x04, 0xf8, 0xa5, 0x38, 0x76, 0xc0, 0x51, 0xa9, 0xed, 0xdc, 0x4c, 0x1c, 0x95, 0x62, 0x17,
0xdc, 0x2f, 0xb4, 0xb0, 0x3d, 0xfe, 0x4b, 0xcc, 0x11, 0xef, 0x43, 0xf3, 0x46, 0x5e, 0x7f, 0xa3,
0xd0, 0xb5, 0xb9, 0x32, 0xc0, 0x07, 0xe0, 0xd3, 0x77, 0x55, 0x70, 0x11, 0x7a, 0x3d, 0x11, 0xb7,
0x93, 0x2a, 0x32, 0xe8, 0x82, 0xa5, 0xe6, 0xb0, 0x59, 0xa2, 0x6d, 0x60, 0x54, 0x29, 0x4b, 0x43,
0xbf, 0x54, 0xa5, 0xcc, 0xf6, 0x21, 0xad, 0xc3, 0x56, 0x4f, 0xc4, 0x41, 0x62, 0x8e, 0xf8, 0x08,
0xe0, 0x52, 0x93, 0x64, 0x4a, 0x3f, 0x49, 0x0e, 0xdb, 0x3d, 0x11, 0xbb, 0x49, 0x50, 0x65, 0x8e,
0x39, 0x0a, 0xa0, 0x75, 0x96, 0xf3, 0x54, 0x65, 0x57, 0xd1, 0x00, 0xfc, 0x51, 0x3e, 0x93, 0x2a,
0x5b, 0x75, 0x13, 0x1b, 0xba, 0x39, 0x75, 0xb7, 0xe8, 0x2b, 0xb4, 0x4f, 0xd9, 0xb8, 0x94, 0x6b,
0xe3, 0x77, 0x6a, 0xd9, 0x6b, 0x7e, 0x97, 0xa2, 0x49, 0x55, 0xae, 0x06, 0x57, 0xa5, 0x50, 0x3b,
0x29, 0x83, 0xa5, 0x41, 0xee, 0x06, 0x83, 0xbc, 0xbf, 0x0c, 0x8a, 0x7e, 0x0a, 0x68, 0x4e, 0x58,
0x72, 0x81, 0xcf, 0xc1, 0x4b, 0x25, 0xcb, 0x6a, 0x29, 0xc2, 0xba, 0x9d, 0xad, 0xf6, 0x47, 0x92,
0xe5, 0x38, 0x63, 0xbd, 0x48, 0x2c, 0x0a, 0xf7, 0xa0, 0xc5, 0x6a, 0x46, 0xc6, 0x03, 0xc7, 0x7a,
0xe0, 0x9b, 0xf0, 0x98, 0xf7, 0x5f, 0x41, 0x50, 0x63, 0x97, 0xb7, 0x10, 0xa5, 0x7d, 0xb7, 0x6e,
0xe1, 0xd8, 0x5c, 0x19, 0xbc, 0x75, 0x5e, 0x8b, 0xe8, 0x1d, 0x78, 0xa7, 0x99, 0x62, 0xc4, 0x72,
0x25, 0x2a, 0x52, 0xb9, 0x1e, 0x08, 0xde, 0x99, 0x9c, 0x2d, 0x49, 0xf6, 0x6c, 0xb4, 0x47, 0x4a,
0xdb, 0x09, 0x83, 0xc4, 0x1c, 0x87, 0xbf, 0x3d, 0x70, 0x46, 0x27, 0x18, 0x83, 0xa7, 0x8c, 0xd0,
0xff, 0xf5, 0x08, 0x46, 0x77, 0xff, 0xee, 0xc2, 0x46, 0x0d, 0x7c, 0x0a, 0xee, 0x15, 0x31, 0xde,
0xad, 0x6c, 0x82, 0x1e, 0x41, 0x70, 0x45, 0x3c, 0x61, 0x4d, 0x72, 0xb6, 0x0d, 0x21, 0x16, 0x03,
0x61, 0xf4, 0xa7, 0xb2, 0xd8, 0x4a, 0xff, 0x19, 0xb8, 0xc5, 0xa6, 0xab, 0x74, 0xeb, 0xc4, 0x72,
0xad, 0x1a, 0xd8, 0x87, 0x56, 0x41, 0x3c, 0x59, 0x64, 0x97, 0xdb, 0xe1, 0x0f, 0xc1, 0x4f, 0xe9,
0x9a, 0x98, 0xb6, 0x83, 0xbf, 0x30, 0x8f, 0x87, 0x81, 0x6f, 0xdf, 0x61, 0x08, 0x6d, 0xb5, 0x5c,
0xdc, 0x35, 0xc2, 0xbd, 0xd5, 0xff, 0x50, 0x61, 0xa2, 0xc6, 0x40, 0xe0, 0x1b, 0xd8, 0xd5, 0x74,
0x43, 0xba, 0xa0, 0xd3, 0x7f, 0xa5, 0x1e, 0xd8, 0xef, 0x89, 0x0b, 0x5c, 0xbb, 0xcb, 0x7e, 0xe7,
0xf6, 0xde, 0x46, 0x0d, 0x1c, 0x00, 0x5c, 0x98, 0x47, 0xef, 0xa3, 0x56, 0x4c, 0xb8, 0xaa, 0xdb,
0x97, 0x70, 0xe3, 0x34, 0x2f, 0xa1, 0xb3, 0x62, 0x58, 0x13, 0xb6, 0x60, 0x5d, 0xf8, 0x36, 0x75,
0xf4, 0x27, 0x00, 0x00, 0xff, 0xff, 0x95, 0xf4, 0xe3, 0x82, 0x7a, 0x05, 0x00, 0x00,

View File

@ -2,6 +2,19 @@ syntax = "proto3";
package protodb;
message Batch {
repeated Operation ops = 1;
message Operation {
Entity entity = 1;
enum Type {
SET = 0;
Type type = 2;
message Entity {
int32 id = 1;
bytes key = 2;
@ -53,4 +66,6 @@ service DB {
rpc reverseIterator(Entity) returns (stream Iterator) {}
// rpc print(Nothing) returns (Entity) {}
rpc stats(Nothing) returns (Stats) {}
rpc batchWrite(Batch) returns (Nothing) {}
rpc batchWriteSync(Batch) returns (Nothing) {}

View File

@ -90,9 +90,11 @@ func (rd *RemoteDB) ReverseIterator(start, end []byte) db.Iterator {
return makeReverseIterator(dic)
// TODO: Implement NewBatch
func (rd *RemoteDB) NewBatch() db.Batch {
return &batch{
db: rd,
ops: nil,
// TODO: Implement Print when db.DB implements a method
@ -218,5 +220,43 @@ func (itr *iterator) Value() []byte {
func (itr *iterator) Close() {
// TODO: Shut down the iterator
err := itr.dic.CloseSend()
if err != nil {
panic(fmt.Sprintf("Error closing iterator: %v", err))
type batch struct {
db *RemoteDB
ops []*protodb.Operation
var _ db.Batch = (*batch)(nil)
func (bat *batch) Set(key, value []byte) {
op := &protodb.Operation{
Entity: &protodb.Entity{Key: key, Value: value},
Type: protodb.Operation_SET,
bat.ops = append(bat.ops, op)
func (bat *batch) Delete(key []byte) {
op := &protodb.Operation{
Entity: &protodb.Entity{Key: key},
Type: protodb.Operation_DELETE,
bat.ops = append(bat.ops, op)
func (bat *batch) Write() {
if _, err := bat.db.dc.BatchWrite(bat.db.ctx, &protodb.Batch{Ops: bat.ops}); err != nil {
panic(fmt.Sprintf("RemoteDB.BatchWrite: %v", err))
func (bat *batch) WriteSync() {
if _, err := bat.db.dc.BatchWriteSync(bat.db.ctx, &protodb.Batch{Ops: bat.ops}); err != nil {
panic(fmt.Sprintf("RemoteDB.BatchWriteSync: %v", err))

View File

@ -60,6 +60,7 @@ func TestRemoteDB(t *testing.T) {
require.Equal(t, itr.Key(), []byte("key-2"))
require.Equal(t, itr.Value(), []byte("value-2"))
require.Panics(t, itr.Next)
// Deletion
@ -69,5 +70,36 @@ func TestRemoteDB(t *testing.T) {
require.Equal(t, len(gv2), 0, "after deletion, not expecting the key to exist anymore")
require.Equal(t, len(gv1), 0, "after deletion, not expecting the key to exist anymore")
// TODO Batch tests
// Batch tests - set
k3 := []byte("key-3")
k4 := []byte("key-4")
k5 := []byte("key-5")
v3 := []byte("value-3")
v4 := []byte("value-4")
v5 := []byte("value-5")
bat := client.NewBatch()
bat.Set(k3, v3)
bat.Set(k4, v4)
rv3 := client.Get(k3)
require.Equal(t, 0, len(rv3), "expecting no k3 to have been stored")
rv4 := client.Get(k4)
require.Equal(t, 0, len(rv4), "expecting no k4 to have been stored")
rv3 = client.Get(k3)
require.Equal(t, rv3, v3, "expecting k3 to have been stored")
rv4 = client.Get(k4)
require.Equal(t, rv4, v4, "expecting k4 to have been stored")
// Batch tests - set and delete
bat = client.NewBatch()
bat.Set(k5, v5)
rv3 = client.Get(k3)
require.Equal(t, 0, len(rv3), "expecting k3 to have been deleted")
rv4 = client.Get(k4)
require.Equal(t, 0, len(rv4), "expecting k4 to have been deleted")
rv5 := client.Get(k5)
require.Equal(t, rv5, v5, "expecting k5 to have been stored")