mirror of https://github.com/certusone/dc4bc.git
simplify countLines, more informative errors
This commit is contained in:
parent
0eb587ca04
commit
42b085728f
|
@ -2,8 +2,8 @@ package storage
|
|||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/google/uuid"
|
||||
"github.com/juju/fslock"
|
||||
"io"
|
||||
|
@ -24,30 +24,15 @@ const (
|
|||
defaultLockFile = "/tmp/dc4bc_storage_lock"
|
||||
)
|
||||
|
||||
func countLines(r io.Reader) (uint64, error) {
|
||||
func countLines(r io.Reader) uint64 {
|
||||
var count uint64
|
||||
buf := make([]byte, bufio.MaxScanTokenSize)
|
||||
for {
|
||||
bufferSize, err := r.Read(buf)
|
||||
if err != nil && err != io.EOF {
|
||||
return 0, err
|
||||
}
|
||||
fileScanner := bufio.NewScanner(r)
|
||||
|
||||
var buffPosition int
|
||||
for {
|
||||
i := bytes.IndexByte(buf[buffPosition:], EOL)
|
||||
if i == -1 || bufferSize == buffPosition {
|
||||
break
|
||||
}
|
||||
buffPosition += i + 1
|
||||
count++
|
||||
}
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
for fileScanner.Scan() {
|
||||
count++
|
||||
}
|
||||
|
||||
return count, nil
|
||||
return count
|
||||
}
|
||||
|
||||
// InitFileStorage inits append-only file storage
|
||||
|
@ -64,7 +49,7 @@ func InitFileStorage(filename string, lockFilename ...string) (Storage, error) {
|
|||
}
|
||||
|
||||
if fs.dataFile, err = os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644); err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to open a data file: %v", err)
|
||||
}
|
||||
fs.reader = bufio.NewReader(fs.dataFile)
|
||||
return &fs, nil
|
||||
|
@ -73,27 +58,28 @@ func InitFileStorage(filename string, lockFilename ...string) (Storage, error) {
|
|||
// Send sends a message to an append-only data file, returns a message with offset and id
|
||||
func (fs *FileStorage) Send(m Message) (Message, error) {
|
||||
var (
|
||||
err error
|
||||
data []byte
|
||||
err error
|
||||
)
|
||||
if err = fs.lockFile.Lock(); err != nil {
|
||||
return m, err
|
||||
return m, fmt.Errorf("failed to lock a file: %v", err)
|
||||
}
|
||||
defer fs.lockFile.Unlock()
|
||||
|
||||
m.ID = uuid.New().String()
|
||||
|
||||
if _, err = fs.dataFile.Seek(0, 0); err != nil { // otherwise countLines will return zero
|
||||
return m, err
|
||||
return m, fmt.Errorf("failed to seek a offset to the start of a data file: %v", err)
|
||||
}
|
||||
if m.Offset, err = countLines(fs.dataFile); err != nil {
|
||||
return m, err
|
||||
m.Offset = countLines(fs.dataFile)
|
||||
|
||||
if data, err = json.Marshal(m); err != nil {
|
||||
return m, fmt.Errorf("failed to marshal a message %v: %v", m, err)
|
||||
}
|
||||
data, err := json.Marshal(m)
|
||||
if err != nil {
|
||||
return m, err
|
||||
|
||||
if _, err = fmt.Fprintln(fs.dataFile, string(data)); err != nil {
|
||||
return m, fmt.Errorf("failed to write a message to a data file: %v", err)
|
||||
}
|
||||
data = append(data, EOL)
|
||||
_, err = fs.dataFile.Write(data)
|
||||
return m, err
|
||||
}
|
||||
|
||||
|
@ -106,7 +92,7 @@ func (fs *FileStorage) GetMessages(offset int) ([]Message, error) {
|
|||
data Message
|
||||
)
|
||||
if _, err = fs.dataFile.Seek(0, 0); err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to seek a offset to the start of a data file: %v", err)
|
||||
}
|
||||
for {
|
||||
row, err = fs.reader.ReadBytes(EOL)
|
||||
|
@ -125,7 +111,7 @@ func (fs *FileStorage) GetMessages(offset int) ([]Message, error) {
|
|||
msgs = append(msgs, data)
|
||||
}
|
||||
if err != io.EOF {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("error while reading data file: %v", err)
|
||||
}
|
||||
return msgs, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue