mirror of https://github.com/certusone/dc4bc.git
197 lines
4.6 KiB
Go
197 lines
4.6 KiB
Go
package client
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/syndtr/goleveldb/leveldb"
|
|
)
|
|
|
|
const (
|
|
offsetKey = "offset"
|
|
operationsKey = "operations"
|
|
fsmStateKey = "fsm_state"
|
|
)
|
|
|
|
type State interface {
|
|
SaveOffset(uint64) error
|
|
LoadOffset() (uint64, error)
|
|
|
|
SaveFSM([]byte) error
|
|
LoadFSM() ([]byte, error)
|
|
|
|
PutOperation(operation *Operation) error
|
|
DeleteOperation(operationID string) error
|
|
GetOperations() (map[string]*Operation, error)
|
|
GetOperationByID(operationID string) (*Operation, error)
|
|
}
|
|
|
|
type LevelDBState struct {
|
|
sync.Mutex
|
|
stateDb *leveldb.DB
|
|
}
|
|
|
|
func NewLevelDBState(stateDbPath string) (State, error) {
|
|
db, err := leveldb.OpenFile(stateDbPath, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open stateDB: %w", err)
|
|
}
|
|
|
|
state := &LevelDBState{
|
|
stateDb: db,
|
|
}
|
|
|
|
// Init state key for operations JSON.
|
|
if err := state.initJsonKey(operationsKey, map[string]*Operation{}); err != nil {
|
|
return nil, fmt.Errorf("failed to init %s storage: %w", operationsKey, err)
|
|
}
|
|
|
|
// Init state key for offset bytes.
|
|
bz := make([]byte, 8)
|
|
binary.LittleEndian.PutUint64(bz, 0)
|
|
if err := db.Put([]byte(offsetKey), bz, nil); err != nil {
|
|
return nil, fmt.Errorf("failed to init %s storage: %w", offsetKey, err)
|
|
}
|
|
|
|
return state, nil
|
|
}
|
|
|
|
func (s *LevelDBState) initJsonKey(key string, data interface{}) error {
|
|
if _, err := s.stateDb.Get([]byte(key), nil); err != nil {
|
|
operationsBz, err := json.Marshal(data)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal storage structure: %w", err)
|
|
}
|
|
err = s.stateDb.Put([]byte(key), operationsBz, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to init state: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *LevelDBState) SaveOffset(offset uint64) error {
|
|
bz := make([]byte, 8)
|
|
binary.LittleEndian.PutUint64(bz, offset)
|
|
|
|
if err := s.stateDb.Put([]byte(offsetKey), bz, nil); err != nil {
|
|
return fmt.Errorf("failed to set offset: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *LevelDBState) LoadOffset() (uint64, error) {
|
|
bz, err := s.stateDb.Get([]byte(offsetKey), nil)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to read offset: %w", err)
|
|
}
|
|
|
|
offset := binary.LittleEndian.Uint64(bz)
|
|
return offset, nil
|
|
}
|
|
|
|
// TODO: implement.
|
|
func (s *LevelDBState) SaveFSM(fsmState []byte) error {
|
|
if err := s.stateDb.Put([]byte(fsmStateKey), fsmState, nil); err != nil {
|
|
return fmt.Errorf("failed to save fsm state: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TODO: implement.
|
|
func (s *LevelDBState) LoadFSM() ([]byte, error) {
|
|
return s.stateDb.Get([]byte(fsmStateKey), nil)
|
|
}
|
|
|
|
func (s *LevelDBState) PutOperation(operation *Operation) error {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
|
|
operations, err := s.getOperations()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to getOperations: %w", err)
|
|
}
|
|
|
|
if _, ok := operations[operation.ID]; ok {
|
|
return fmt.Errorf("operation %s already exists", operation.ID)
|
|
}
|
|
|
|
operations[operation.ID] = operation
|
|
operationsJSON, err := json.Marshal(operations)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal operations: %w", err)
|
|
}
|
|
|
|
if err := s.stateDb.Put([]byte(operationsKey), operationsJSON, nil); err != nil {
|
|
return fmt.Errorf("failed to put operations: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *LevelDBState) DeleteOperation(operationID string) error {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
|
|
operations, err := s.getOperations()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to getOperations: %w", err)
|
|
}
|
|
|
|
delete(operations, operationID)
|
|
|
|
operationsJSON, err := json.Marshal(operations)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal operations: %w", err)
|
|
}
|
|
|
|
if err := s.stateDb.Put([]byte(operationsKey), operationsJSON, nil); err != nil {
|
|
return fmt.Errorf("failed to put operations: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *LevelDBState) GetOperations() (map[string]*Operation, error) {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
|
|
return s.getOperations()
|
|
}
|
|
|
|
func (s *LevelDBState) GetOperationByID(operationID string) (*Operation, error) {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
|
|
operations, err := s.getOperations()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to getOperations: %w", err)
|
|
}
|
|
|
|
operation, ok := operations[operationID]
|
|
if !ok {
|
|
return nil, errors.New("operation not found")
|
|
}
|
|
|
|
return operation, nil
|
|
}
|
|
|
|
func (s *LevelDBState) getOperations() (map[string]*Operation, error) {
|
|
bz, err := s.stateDb.Get([]byte(operationsKey), nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get Operations (key: %s): %w", operationsKey, err)
|
|
}
|
|
|
|
var operations map[string]*Operation
|
|
if err := json.Unmarshal(bz, &operations); err != nil {
|
|
return nil, fmt.Errorf("failed to unmarshal Operations: %w", err)
|
|
}
|
|
|
|
return operations, nil
|
|
}
|