mirror of https://github.com/certusone/dc4bc.git
lock file, offset, id in Message, Post -> Send
This commit is contained in:
parent
363d085bcb
commit
cc3becf5e7
2
go.mod
2
go.mod
|
@ -3,6 +3,8 @@ module p2p.org/dc4bc
|
|||
go 1.13
|
||||
|
||||
require (
|
||||
github.com/google/uuid v1.1.1
|
||||
github.com/juju/fslock v0.0.0-20160525022230-4d5c94c67b4b
|
||||
github.com/makiuchi-d/gozxing v0.0.0-20190830103442-eaff64b1ceb7
|
||||
github.com/mattn/go-gtk v0.0.0-20191030024613-af2e013261f5
|
||||
github.com/mattn/go-pointer v0.0.0-20190911064623-a0a44394634f // indirect
|
||||
|
|
|
@ -2,60 +2,110 @@ package storage
|
|||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"github.com/google/uuid"
|
||||
"github.com/juju/fslock"
|
||||
"io"
|
||||
"os"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var _ Storage = (*FileStorage)(nil)
|
||||
|
||||
type FileStorage struct {
|
||||
sync.Mutex
|
||||
file *os.File
|
||||
reader *bufio.Reader
|
||||
lockFile *fslock.Lock
|
||||
|
||||
dataFile *os.File
|
||||
reader *bufio.Reader
|
||||
}
|
||||
|
||||
const (
|
||||
EOL = '\n'
|
||||
EOL = '\n'
|
||||
defaultLockFile = "/tmp/dc4bc_storage_lock"
|
||||
)
|
||||
|
||||
func InitFileStorage(filename string) (Storage, error) {
|
||||
func countLines(r io.Reader) (uint64, error) {
|
||||
var count uint64
|
||||
buf := make([]byte, bufio.MaxScanTokenSize)
|
||||
for {
|
||||
bufferSize, err := r.Read(buf)
|
||||
if err != nil && err != io.EOF {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var buffPosition int
|
||||
for {
|
||||
i := bytes.IndexByte(buf[buffPosition:], EOL)
|
||||
if i == -1 || bufferSize == buffPosition {
|
||||
break
|
||||
}
|
||||
buffPosition += i + 1
|
||||
count++
|
||||
}
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return count, nil
|
||||
}
|
||||
|
||||
// InitFileStorage inits append-only file storage
|
||||
// It takes two arguments: filename - path to a data file, lockFilename (optional) - path to a lock file
|
||||
func InitFileStorage(filename string, lockFilename ...string) (Storage, error) {
|
||||
var (
|
||||
fs FileStorage
|
||||
err error
|
||||
)
|
||||
if fs.file, err = os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644); err != nil {
|
||||
if len(lockFilename) > 0 {
|
||||
fs.lockFile = fslock.New(lockFilename[0])
|
||||
} else {
|
||||
fs.lockFile = fslock.New(defaultLockFile)
|
||||
}
|
||||
|
||||
if fs.dataFile, err = os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fs.reader = bufio.NewReader(fs.file)
|
||||
fs.reader = bufio.NewReader(fs.dataFile)
|
||||
return &fs, nil
|
||||
}
|
||||
|
||||
func (fs *FileStorage) Post(m Message) error {
|
||||
fs.Lock()
|
||||
defer fs.Unlock()
|
||||
// Send sends a message to an append-only data file, returns a message with offset and id
|
||||
func (fs *FileStorage) Send(m Message) (Message, error) {
|
||||
var (
|
||||
err error
|
||||
)
|
||||
if err = fs.lockFile.Lock(); err != nil {
|
||||
return m, err
|
||||
}
|
||||
defer fs.lockFile.Unlock()
|
||||
|
||||
m.ID = uuid.New().String()
|
||||
|
||||
if _, err = fs.dataFile.Seek(0, 0); err != nil { // otherwise countLines will return zero
|
||||
return m, err
|
||||
}
|
||||
if m.Offset, err = countLines(fs.dataFile); err != nil {
|
||||
return m, err
|
||||
}
|
||||
data, err := json.Marshal(m)
|
||||
if err != nil {
|
||||
return err
|
||||
return m, err
|
||||
}
|
||||
data = append(data, EOL)
|
||||
_, err = fs.file.Write(data)
|
||||
return err
|
||||
_, err = fs.dataFile.Write(data)
|
||||
return m, err
|
||||
}
|
||||
|
||||
// GetMessages returns a slice of messages from append-only data file with given offset
|
||||
func (fs *FileStorage) GetMessages(offset int) ([]Message, error) {
|
||||
fs.Lock()
|
||||
defer fs.Unlock()
|
||||
|
||||
var (
|
||||
msgs []Message
|
||||
err error
|
||||
row []byte
|
||||
data Message
|
||||
)
|
||||
if _, err = fs.file.Seek(0, 0); err != nil {
|
||||
if _, err = fs.dataFile.Seek(0, 0); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for {
|
||||
|
@ -81,5 +131,5 @@ func (fs *FileStorage) GetMessages(offset int) ([]Message, error) {
|
|||
}
|
||||
|
||||
func (fs *FileStorage) Close() error {
|
||||
return fs.file.Close()
|
||||
return fs.dataFile.Close()
|
||||
}
|
||||
|
|
|
@ -30,10 +30,11 @@ func TestFileStorage_GetMessages(t *testing.T) {
|
|||
Data: randomBytes(10),
|
||||
Signature: randomBytes(10),
|
||||
}
|
||||
msgs = append(msgs, msg)
|
||||
if err = fs.Post(msg); err != nil {
|
||||
msg, err = fs.Send(msg)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
msgs = append(msgs, msg)
|
||||
}
|
||||
offsetMsgs, err := fs.GetMessages(offset)
|
||||
if err != nil {
|
||||
|
|
|
@ -3,10 +3,12 @@ package storage
|
|||
type Message struct {
|
||||
Data []byte `json:"data"`
|
||||
Signature []byte `json:"signature"`
|
||||
ID string `json:"id"`
|
||||
Offset uint64 `json:"offset"`
|
||||
}
|
||||
|
||||
type Storage interface {
|
||||
Post(message Message) error
|
||||
Send(message Message) (Message, error)
|
||||
GetMessages(offset int) ([]Message, error)
|
||||
Close() error
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue