Fix issue where buffered writes may split a line to two files

This commit is contained in:
Jae Kwon 2016-11-20 17:19:15 -08:00
parent dc8fa06e64
commit d1848762cf
2 changed files with 34 additions and 29 deletions

View File

@ -1,10 +1,11 @@
package autofile package autofile
import ( import (
. "github.com/tendermint/go-common"
"os" "os"
"sync" "sync"
"time" "time"
. "github.com/tendermint/go-common"
) )
/* AutoFile usage /* AutoFile usage
@ -87,12 +88,15 @@ func (af *AutoFile) closeFile() (err error) {
func (af *AutoFile) Write(b []byte) (n int, err error) { func (af *AutoFile) Write(b []byte) (n int, err error) {
af.mtx.Lock() af.mtx.Lock()
defer af.mtx.Unlock() defer af.mtx.Unlock()
if af.file == nil { if af.file == nil {
if err = af.openFile(); err != nil { if err = af.openFile(); err != nil {
return return
} }
} }
return af.file.Write(b)
n, err = af.file.Write(b)
return
} }
func (af *AutoFile) Sync() error { func (af *AutoFile) Sync() error {

View File

@ -3,7 +3,6 @@ package autofile
import ( import (
"bufio" "bufio"
"errors" "errors"
"fmt"
"io" "io"
"os" "os"
"path" "path"
@ -52,7 +51,8 @@ const defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB
type Group struct { type Group struct {
ID string ID string
Head *AutoFile // The head AutoFile to write to Head *AutoFile // The head AutoFile to write to
Dir string // Directory that contains .Head headBuf *bufio.Writer
Dir string // Directory that contains .Head
ticker *time.Ticker ticker *time.Ticker
mtx sync.Mutex mtx sync.Mutex
headSizeLimit int64 headSizeLimit int64
@ -70,6 +70,7 @@ func OpenGroup(head *AutoFile) (g *Group, err error) {
g = &Group{ g = &Group{
ID: "group:" + head.ID, ID: "group:" + head.ID,
Head: head, Head: head,
headBuf: bufio.NewWriterSize(head, 4096*10),
Dir: dir, Dir: dir,
ticker: time.NewTicker(groupCheckDuration), ticker: time.NewTicker(groupCheckDuration),
headSizeLimit: defaultHeadSizeLimit, headSizeLimit: defaultHeadSizeLimit,
@ -114,9 +115,10 @@ func (g *Group) MaxIndex() int {
} }
// Auto appends "\n" // Auto appends "\n"
// NOTE: Writes are buffered so they don't write synchronously
// TODO: Make it halt if space is unavailable // TODO: Make it halt if space is unavailable
func (g *Group) WriteLine(line string) error { func (g *Group) WriteLine(line string) error {
_, err := g.Head.Write([]byte(line + "\n")) _, err := g.headBuf.Write([]byte(line + "\n"))
return err return err
} }
@ -329,7 +331,7 @@ GROUP_LOOP:
} }
// Scan each line and test whether line matches // Scan each line and test whether line matches
for { for {
line, err := r.ReadLineInCurrent() line, err := r.ReadLine()
if err == io.EOF { if err == io.EOF {
if found { if found {
return match, found, nil return match, found, nil
@ -343,6 +345,13 @@ GROUP_LOOP:
match = line match = line
found = true found = true
} }
if r.CurIndex() > i {
if found {
return match, found, nil
} else {
continue GROUP_LOOP
}
}
} }
} }
@ -461,21 +470,11 @@ func (gr *GroupReader) Close() error {
} }
} }
// Reads a line (without delimiter)
// just return io.EOF if no new lines found.
func (gr *GroupReader) ReadLine() (string, error) { func (gr *GroupReader) ReadLine() (string, error) {
gr.mtx.Lock() gr.mtx.Lock()
defer gr.mtx.Unlock() defer gr.mtx.Unlock()
return gr.readLineWithOptions(false)
}
func (gr *GroupReader) ReadLineInCurrent() (string, error) {
gr.mtx.Lock()
defer gr.mtx.Unlock()
return gr.readLineWithOptions(true)
}
// curFileOnly: if True, do not open new files,
// just return io.EOF if no new lines found.
func (gr *GroupReader) readLineWithOptions(curFileOnly bool) (string, error) {
// From PushLine // From PushLine
if gr.curLine != nil { if gr.curLine != nil {
@ -493,23 +492,25 @@ func (gr *GroupReader) readLineWithOptions(curFileOnly bool) (string, error) {
} }
// Iterate over files until line is found // Iterate over files until line is found
var linePrefix string
for { for {
bytes, err := gr.curReader.ReadBytes('\n') bytesRead, err := gr.curReader.ReadBytes('\n')
if err != nil { if err == io.EOF {
if err != io.EOF { // Open the next file
return "", err err := gr.openFile(gr.curIndex + 1)
} else if curFileOnly { if err != nil {
return "", err return "", err
}
if len(bytesRead) > 0 && bytesRead[len(bytesRead)-1] == byte('\n') {
return linePrefix + string(bytesRead[:len(bytesRead)-1]), nil
} else { } else {
// Open the next file linePrefix += string(bytesRead)
err := gr.openFile(gr.curIndex + 1)
if err != nil {
return "", err
}
continue continue
} }
} else if err != nil {
return "", err
} }
return string(bytes), nil return linePrefix + string(bytesRead[:len(bytesRead)-1]), nil
} }
} }