Merge pull request #65 from tendermint/573-wal-issues-2

[autofile] Support for the new WAL format
This commit is contained in:
Ethan Buchman 2017-10-25 22:00:36 -04:00 committed by GitHub
commit b30e3ba26d
2 changed files with 196 additions and 27 deletions

View File

@ -18,6 +18,13 @@ import (
. "github.com/tendermint/tmlibs/common" . "github.com/tendermint/tmlibs/common"
) )
const (
groupCheckDuration = 5000 * time.Millisecond
defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB
defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB
maxFilesToRemove = 4 // needs to be greater than 1
)
/* /*
You can open a Group to keep restrictions on an AutoFile, like You can open a Group to keep restrictions on an AutoFile, like
the maximum size of each chunk, and/or the total amount of bytes the maximum size of each chunk, and/or the total amount of bytes
@ -25,33 +32,27 @@ stored in the group.
The first file to be written in the Group.Dir is the head file. The first file to be written in the Group.Dir is the head file.
Dir/ Dir/
- <HeadPath> - <HeadPath>
Once the Head file reaches the size limit, it will be rotated. Once the Head file reaches the size limit, it will be rotated.
Dir/ Dir/
- <HeadPath>.000 // First rolled file - <HeadPath>.000 // First rolled file
- <HeadPath> // New head path, starts empty. - <HeadPath> // New head path, starts empty.
// The implicit index is 001. // The implicit index is 001.
As more files are written, the index numbers grow... As more files are written, the index numbers grow...
Dir/ Dir/
- <HeadPath>.000 // First rolled file - <HeadPath>.000 // First rolled file
- <HeadPath>.001 // Second rolled file - <HeadPath>.001 // Second rolled file
- ... - ...
- <HeadPath> // New head path - <HeadPath> // New head path
The Group can also be used to binary-search for some line, The Group can also be used to binary-search for some line,
assuming that marker lines are written occasionally. assuming that marker lines are written occasionally.
*/ */
const groupCheckDuration = 5000 * time.Millisecond
const defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB
const defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB
const maxFilesToRemove = 4 // needs to be greater than 1
type Group struct { type Group struct {
BaseService BaseService
@ -109,37 +110,61 @@ func (g *Group) OnStop() {
g.ticker.Stop() g.ticker.Stop()
} }
// SetHeadSizeLimit allows you to overwrite default head size limit - 10MB.
func (g *Group) SetHeadSizeLimit(limit int64) { func (g *Group) SetHeadSizeLimit(limit int64) {
g.mtx.Lock() g.mtx.Lock()
g.headSizeLimit = limit g.headSizeLimit = limit
g.mtx.Unlock() g.mtx.Unlock()
} }
// HeadSizeLimit returns the current head size limit.
func (g *Group) HeadSizeLimit() int64 { func (g *Group) HeadSizeLimit() int64 {
g.mtx.Lock() g.mtx.Lock()
defer g.mtx.Unlock() defer g.mtx.Unlock()
return g.headSizeLimit return g.headSizeLimit
} }
// SetTotalSizeLimit allows you to overwrite default total size limit of the
// group - 1GB.
func (g *Group) SetTotalSizeLimit(limit int64) { func (g *Group) SetTotalSizeLimit(limit int64) {
g.mtx.Lock() g.mtx.Lock()
g.totalSizeLimit = limit g.totalSizeLimit = limit
g.mtx.Unlock() g.mtx.Unlock()
} }
// TotalSizeLimit returns total size limit of the group.
func (g *Group) TotalSizeLimit() int64 { func (g *Group) TotalSizeLimit() int64 {
g.mtx.Lock() g.mtx.Lock()
defer g.mtx.Unlock() defer g.mtx.Unlock()
return g.totalSizeLimit return g.totalSizeLimit
} }
// MaxIndex returns index of the last file in the group.
func (g *Group) MaxIndex() int { func (g *Group) MaxIndex() int {
g.mtx.Lock() g.mtx.Lock()
defer g.mtx.Unlock() defer g.mtx.Unlock()
return g.maxIndex return g.maxIndex
} }
// Auto appends "\n" // MinIndex returns index of the first file in the group.
func (g *Group) MinIndex() int {
g.mtx.Lock()
defer g.mtx.Unlock()
return g.minIndex
}
// Write writes the contents of p into the current head of the group. It
// returns the number of bytes written. If nn < len(p), it also returns an
// error explaining why the write is short.
// NOTE: Writes are buffered so they don't write synchronously
// TODO: Make it halt if space is unavailable
func (g *Group) Write(p []byte) (nn int, err error) {
g.mtx.Lock()
defer g.mtx.Unlock()
return g.headBuf.Write(p)
}
// WriteLine writes line into the current head of the group. It also appends "\n".
// NOTE: Writes are buffered so they don't write synchronously // NOTE: Writes are buffered so they don't write synchronously
// TODO: Make it halt if space is unavailable // TODO: Make it halt if space is unavailable
func (g *Group) WriteLine(line string) error { func (g *Group) WriteLine(line string) error {
@ -149,6 +174,8 @@ func (g *Group) WriteLine(line string) error {
return err return err
} }
// Flush writes any buffered data to the underlying file and commits the
// current content of the file to stable storage.
func (g *Group) Flush() error { func (g *Group) Flush() error {
g.mtx.Lock() g.mtx.Lock()
defer g.mtx.Unlock() defer g.mtx.Unlock()
@ -223,6 +250,8 @@ func (g *Group) checkTotalSizeLimit() {
} }
} }
// RotateFile causes group to close the current head and assign it some index.
// Note it does not create a new head.
func (g *Group) RotateFile() { func (g *Group) RotateFile() {
g.mtx.Lock() g.mtx.Lock()
defer g.mtx.Unlock() defer g.mtx.Unlock()
@ -241,8 +270,8 @@ func (g *Group) RotateFile() {
g.maxIndex += 1 g.maxIndex += 1
} }
// NOTE: if error, returns no GroupReader. // NewReader returns a new group reader.
// CONTRACT: Caller must close the returned GroupReader // CONTRACT: Caller must close the returned GroupReader.
func (g *Group) NewReader(index int) (*GroupReader, error) { func (g *Group) NewReader(index int) (*GroupReader, error) {
r := newGroupReader(g) r := newGroupReader(g)
err := r.SetIndex(index) err := r.SetIndex(index)
@ -423,14 +452,15 @@ GROUP_LOOP:
return return
} }
// GroupInfo holds information about the group.
type GroupInfo struct { type GroupInfo struct {
MinIndex int MinIndex int // index of the first file in the group, including head
MaxIndex int MaxIndex int // index of the last file in the group, including head
TotalSize int64 TotalSize int64 // total size of the group
HeadSize int64 HeadSize int64 // size of the head
} }
// Returns info after scanning all files in g.Head's dir // Returns info after scanning all files in g.Head's dir.
func (g *Group) ReadGroupInfo() GroupInfo { func (g *Group) ReadGroupInfo() GroupInfo {
g.mtx.Lock() g.mtx.Lock()
defer g.mtx.Unlock() defer g.mtx.Unlock()
@ -505,6 +535,7 @@ func filePathForIndex(headPath string, index int, maxIndex int) string {
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
// GroupReader provides an interface for reading from a Group.
type GroupReader struct { type GroupReader struct {
*Group *Group
mtx sync.Mutex mtx sync.Mutex
@ -524,6 +555,7 @@ func newGroupReader(g *Group) *GroupReader {
} }
} }
// Close closes the GroupReader by closing the cursor file.
func (gr *GroupReader) Close() error { func (gr *GroupReader) Close() error {
gr.mtx.Lock() gr.mtx.Lock()
defer gr.mtx.Unlock() defer gr.mtx.Unlock()
@ -540,7 +572,48 @@ func (gr *GroupReader) Close() error {
} }
} }
// Reads a line (without delimiter) // Read implements io.Reader, reading bytes from the current Reader
// incrementing index until enough bytes are read.
func (gr *GroupReader) Read(p []byte) (n int, err error) {
lenP := len(p)
if lenP == 0 {
return 0, errors.New("given empty slice")
}
gr.mtx.Lock()
defer gr.mtx.Unlock()
// Open file if not open yet
if gr.curReader == nil {
if err = gr.openFile(gr.curIndex); err != nil {
return 0, err
}
}
// Iterate over files until enough bytes are read
var nn int
for {
nn, err = gr.curReader.Read(p[n:])
n += nn
if err == io.EOF {
// Open the next file
if err1 := gr.openFile(gr.curIndex + 1); err1 != nil {
return n, err1
}
if n >= lenP {
return n, nil
} else {
continue
}
} else if err != nil {
return n, err
} else if nn == 0 { // empty file
return n, err
}
}
}
// ReadLine reads a line (without delimiter).
// just return io.EOF if no new lines found. // just return io.EOF if no new lines found.
func (gr *GroupReader) ReadLine() (string, error) { func (gr *GroupReader) ReadLine() (string, error) {
gr.mtx.Lock() gr.mtx.Lock()
@ -613,6 +686,9 @@ func (gr *GroupReader) openFile(index int) error {
return nil return nil
} }
// PushLine makes the given line the current one, so the next time somebody
// calls ReadLine, this line will be returned.
// panics if called twice without calling ReadLine.
func (gr *GroupReader) PushLine(line string) { func (gr *GroupReader) PushLine(line string) {
gr.mtx.Lock() gr.mtx.Lock()
defer gr.mtx.Unlock() defer gr.mtx.Unlock()
@ -624,13 +700,15 @@ func (gr *GroupReader) PushLine(line string) {
} }
} }
// Cursor's file index. // CurIndex returns cursor's file index.
func (gr *GroupReader) CurIndex() int { func (gr *GroupReader) CurIndex() int {
gr.mtx.Lock() gr.mtx.Lock()
defer gr.mtx.Unlock() defer gr.mtx.Unlock()
return gr.curIndex return gr.curIndex
} }
// SetIndex sets the cursor's file index to index by opening a file at this
// position.
func (gr *GroupReader) SetIndex(index int) error { func (gr *GroupReader) SetIndex(index int) error {
gr.mtx.Lock() gr.mtx.Lock()
defer gr.mtx.Unlock() defer gr.mtx.Unlock()

View File

@ -1,6 +1,7 @@
package autofile package autofile
import ( import (
"bytes"
"errors" "errors"
"io" "io"
"io/ioutil" "io/ioutil"
@ -400,3 +401,93 @@ func TestFindLast4(t *testing.T) {
// Cleanup // Cleanup
destroyTestGroup(t, g) destroyTestGroup(t, g)
} }
func TestWrite(t *testing.T) {
g := createTestGroup(t, 0)
written := []byte("Medusa")
g.Write(written)
g.Flush()
read := make([]byte, len(written))
gr, err := g.NewReader(0)
if err != nil {
t.Fatalf("Failed to create reader: %v", err)
}
_, err = gr.Read(read)
if err != nil {
t.Fatalf("Failed to read data: %v", err)
}
if !bytes.Equal(written, read) {
t.Errorf("%s, %s should be equal", string(written), string(read))
}
// Cleanup
destroyTestGroup(t, g)
}
func TestGroupReaderRead(t *testing.T) {
g := createTestGroup(t, 0)
professor := []byte("Professor Monster")
g.Write(professor)
g.Flush()
g.RotateFile()
frankenstein := []byte("Frankenstein's Monster")
g.Write(frankenstein)
g.Flush()
totalWrittenLength := len(professor) + len(frankenstein)
read := make([]byte, totalWrittenLength)
gr, err := g.NewReader(0)
if err != nil {
t.Fatalf("Failed to create reader: %v", err)
}
n, err := gr.Read(read)
if err != nil {
t.Fatalf("Failed to read data: %v", err)
}
if n != totalWrittenLength {
t.Errorf("Failed to read enough bytes: wanted %d, but read %d", totalWrittenLength, n)
}
professorPlusFrankenstein := professor
professorPlusFrankenstein = append(professorPlusFrankenstein, frankenstein...)
if !bytes.Equal(read, professorPlusFrankenstein) {
t.Errorf("%s, %s should be equal", string(professorPlusFrankenstein), string(read))
}
// Cleanup
destroyTestGroup(t, g)
}
func TestMinIndex(t *testing.T) {
g := createTestGroup(t, 0)
if g.MinIndex() != 0 {
t.Error("MinIndex should be zero at the beginning")
}
// Cleanup
destroyTestGroup(t, g)
}
func TestMaxIndex(t *testing.T) {
g := createTestGroup(t, 0)
if g.MaxIndex() != 0 {
t.Error("MaxIndex should be zero at the beginning")
}
g.WriteLine("Line 1")
g.Flush()
g.RotateFile()
if g.MaxIndex() != 1 {
t.Error("MaxIndex should point to the last file")
}
// Cleanup
destroyTestGroup(t, g)
}