Merge remote-tracking branch 'autofile/master' into unstable
This commit is contained in:
commit
e01445ea94
|
@ -0,0 +1,2 @@
|
|||
*.swp
|
||||
*.swo
|
|
@ -0,0 +1,203 @@
|
|||
All files are Copyright (C) 2017 All in Bits, Inc
|
||||
|
||||
Apache License
|
||||
Version 2.0, January 2004
|
||||
http://www.apache.org/licenses/
|
||||
|
||||
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
|
||||
|
||||
1. Definitions.
|
||||
|
||||
"License" shall mean the terms and conditions for use, reproduction,
|
||||
and distribution as defined by Sections 1 through 9 of this document.
|
||||
|
||||
"Licensor" shall mean the copyright owner or entity authorized by
|
||||
the copyright owner that is granting the License.
|
||||
|
||||
"Legal Entity" shall mean the union of the acting entity and all
|
||||
other entities that control, are controlled by, or are under common
|
||||
control with that entity. For the purposes of this definition,
|
||||
"control" means (i) the power, direct or indirect, to cause the
|
||||
direction or management of such entity, whether by contract or
|
||||
otherwise, or (ii) ownership of fifty percent (50%) or more of the
|
||||
outstanding shares, or (iii) beneficial ownership of such entity.
|
||||
|
||||
"You" (or "Your") shall mean an individual or Legal Entity
|
||||
exercising permissions granted by this License.
|
||||
|
||||
"Source" form shall mean the preferred form for making modifications,
|
||||
including but not limited to software source code, documentation
|
||||
source, and configuration files.
|
||||
|
||||
"Object" form shall mean any form resulting from mechanical
|
||||
transformation or translation of a Source form, including but
|
||||
not limited to compiled object code, generated documentation,
|
||||
and conversions to other media types.
|
||||
|
||||
"Work" shall mean the work of authorship, whether in Source or
|
||||
Object form, made available under the License, as indicated by a
|
||||
copyright notice that is included in or attached to the work
|
||||
(an example is provided in the Appendix below).
|
||||
|
||||
"Derivative Works" shall mean any work, whether in Source or Object
|
||||
form, that is based on (or derived from) the Work and for which the
|
||||
editorial revisions, annotations, elaborations, or other modifications
|
||||
represent, as a whole, an original work of authorship. For the purposes
|
||||
of this License, Derivative Works shall not include works that remain
|
||||
separable from, or merely link (or bind by name) to the interfaces of,
|
||||
the Work and Derivative Works thereof.
|
||||
|
||||
"Contribution" shall mean any work of authorship, including
|
||||
the original version of the Work and any modifications or additions
|
||||
to that Work or Derivative Works thereof, that is intentionally
|
||||
submitted to Licensor for inclusion in the Work by the copyright owner
|
||||
or by an individual or Legal Entity authorized to submit on behalf of
|
||||
the copyright owner. For the purposes of this definition, "submitted"
|
||||
means any form of electronic, verbal, or written communication sent
|
||||
to the Licensor or its representatives, including but not limited to
|
||||
communication on electronic mailing lists, source code control systems,
|
||||
and issue tracking systems that are managed by, or on behalf of, the
|
||||
Licensor for the purpose of discussing and improving the Work, but
|
||||
excluding communication that is conspicuously marked or otherwise
|
||||
designated in writing by the copyright owner as "Not a Contribution."
|
||||
|
||||
"Contributor" shall mean Licensor and any individual or Legal Entity
|
||||
on behalf of whom a Contribution has been received by Licensor and
|
||||
subsequently incorporated within the Work.
|
||||
|
||||
2. Grant of Copyright License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
copyright license to reproduce, prepare Derivative Works of,
|
||||
publicly display, publicly perform, sublicense, and distribute the
|
||||
Work and such Derivative Works in Source or Object form.
|
||||
|
||||
3. Grant of Patent License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
(except as stated in this section) patent license to make, have made,
|
||||
use, offer to sell, sell, import, and otherwise transfer the Work,
|
||||
where such license applies only to those patent claims licensable
|
||||
by such Contributor that are necessarily infringed by their
|
||||
Contribution(s) alone or by combination of their Contribution(s)
|
||||
with the Work to which such Contribution(s) was submitted. If You
|
||||
institute patent litigation against any entity (including a
|
||||
cross-claim or counterclaim in a lawsuit) alleging that the Work
|
||||
or a Contribution incorporated within the Work constitutes direct
|
||||
or contributory patent infringement, then any patent licenses
|
||||
granted to You under this License for that Work shall terminate
|
||||
as of the date such litigation is filed.
|
||||
|
||||
4. Redistribution. You may reproduce and distribute copies of the
|
||||
Work or Derivative Works thereof in any medium, with or without
|
||||
modifications, and in Source or Object form, provided that You
|
||||
meet the following conditions:
|
||||
|
||||
(a) You must give any other recipients of the Work or
|
||||
Derivative Works a copy of this License; and
|
||||
|
||||
(b) You must cause any modified files to carry prominent notices
|
||||
stating that You changed the files; and
|
||||
|
||||
(c) You must retain, in the Source form of any Derivative Works
|
||||
that You distribute, all copyright, patent, trademark, and
|
||||
attribution notices from the Source form of the Work,
|
||||
excluding those notices that do not pertain to any part of
|
||||
the Derivative Works; and
|
||||
|
||||
(d) If the Work includes a "NOTICE" text file as part of its
|
||||
distribution, then any Derivative Works that You distribute must
|
||||
include a readable copy of the attribution notices contained
|
||||
within such NOTICE file, excluding those notices that do not
|
||||
pertain to any part of the Derivative Works, in at least one
|
||||
of the following places: within a NOTICE text file distributed
|
||||
as part of the Derivative Works; within the Source form or
|
||||
documentation, if provided along with the Derivative Works; or,
|
||||
within a display generated by the Derivative Works, if and
|
||||
wherever such third-party notices normally appear. The contents
|
||||
of the NOTICE file are for informational purposes only and
|
||||
do not modify the License. You may add Your own attribution
|
||||
notices within Derivative Works that You distribute, alongside
|
||||
or as an addendum to the NOTICE text from the Work, provided
|
||||
that such additional attribution notices cannot be construed
|
||||
as modifying the License.
|
||||
|
||||
You may add Your own copyright statement to Your modifications and
|
||||
may provide additional or different license terms and conditions
|
||||
for use, reproduction, or distribution of Your modifications, or
|
||||
for any such Derivative Works as a whole, provided Your use,
|
||||
reproduction, and distribution of the Work otherwise complies with
|
||||
the conditions stated in this License.
|
||||
|
||||
5. Submission of Contributions. Unless You explicitly state otherwise,
|
||||
any Contribution intentionally submitted for inclusion in the Work
|
||||
by You to the Licensor shall be under the terms and conditions of
|
||||
this License, without any additional terms or conditions.
|
||||
Notwithstanding the above, nothing herein shall supersede or modify
|
||||
the terms of any separate license agreement you may have executed
|
||||
with Licensor regarding such Contributions.
|
||||
|
||||
6. Trademarks. This License does not grant permission to use the trade
|
||||
names, trademarks, service marks, or product names of the Licensor,
|
||||
except as required for reasonable and customary use in describing the
|
||||
origin of the Work and reproducing the content of the NOTICE file.
|
||||
|
||||
7. Disclaimer of Warranty. Unless required by applicable law or
|
||||
agreed to in writing, Licensor provides the Work (and each
|
||||
Contributor provides its Contributions) on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied, including, without limitation, any warranties or conditions
|
||||
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
|
||||
PARTICULAR PURPOSE. You are solely responsible for determining the
|
||||
appropriateness of using or redistributing the Work and assume any
|
||||
risks associated with Your exercise of permissions under this License.
|
||||
|
||||
8. Limitation of Liability. In no event and under no legal theory,
|
||||
whether in tort (including negligence), contract, or otherwise,
|
||||
unless required by applicable law (such as deliberate and grossly
|
||||
negligent acts) or agreed to in writing, shall any Contributor be
|
||||
liable to You for damages, including any direct, indirect, special,
|
||||
incidental, or consequential damages of any character arising as a
|
||||
result of this License or out of the use or inability to use the
|
||||
Work (including but not limited to damages for loss of goodwill,
|
||||
work stoppage, computer failure or malfunction, or any and all
|
||||
other commercial damages or losses), even if such Contributor
|
||||
has been advised of the possibility of such damages.
|
||||
|
||||
9. Accepting Warranty or Additional Liability. While redistributing
|
||||
the Work or Derivative Works thereof, You may choose to offer,
|
||||
and charge a fee for, acceptance of support, warranty, indemnity,
|
||||
or other liability obligations and/or rights consistent with this
|
||||
License. However, in accepting such obligations, You may act only
|
||||
on Your own behalf and on Your sole responsibility, not on behalf
|
||||
of any other Contributor, and only if You agree to indemnify,
|
||||
defend, and hold each Contributor harmless for any liability
|
||||
incurred by, or claims asserted against, such Contributor by reason
|
||||
of your accepting any such warranty or additional liability.
|
||||
|
||||
END OF TERMS AND CONDITIONS
|
||||
|
||||
APPENDIX: How to apply the Apache License to your work.
|
||||
|
||||
To apply the Apache License to your work, attach the following
|
||||
boilerplate notice, with the fields enclosed by brackets "{}"
|
||||
replaced with your own identifying information. (Don't include
|
||||
the brackets!) The text should be enclosed in the appropriate
|
||||
comment syntax for the file format. We also recommend that a
|
||||
file or class name and description of purpose be included on the
|
||||
same "printed page" as the copyright notice for easier
|
||||
identification within third-party archives.
|
||||
|
||||
Copyright {yyyy} {name of copyright owner}
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
|
@ -0,0 +1,138 @@
|
|||
package autofile
|
||||
|
||||
import (
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
. "github.com/tendermint/go-common"
|
||||
)
|
||||
|
||||
/* AutoFile usage
|
||||
|
||||
// Create/Append to ./autofile_test
|
||||
af, err := OpenAutoFile("autofile_test")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Stream of writes.
|
||||
// During this time, the file may be moved e.g. by logRotate.
|
||||
for i := 0; i < 60; i++ {
|
||||
af.Write([]byte(Fmt("LOOP(%v)", i)))
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
// Close the AutoFile
|
||||
err = af.Close()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
*/
|
||||
|
||||
const autoFileOpenDuration = 1000 * time.Millisecond
|
||||
|
||||
// Automatically closes and re-opens file for writing.
|
||||
// This is useful for using a log file with the logrotate tool.
|
||||
type AutoFile struct {
|
||||
ID string
|
||||
Path string
|
||||
ticker *time.Ticker
|
||||
mtx sync.Mutex
|
||||
file *os.File
|
||||
}
|
||||
|
||||
func OpenAutoFile(path string) (af *AutoFile, err error) {
|
||||
af = &AutoFile{
|
||||
ID: RandStr(12) + ":" + path,
|
||||
Path: path,
|
||||
ticker: time.NewTicker(autoFileOpenDuration),
|
||||
}
|
||||
if err = af.openFile(); err != nil {
|
||||
return
|
||||
}
|
||||
go af.processTicks()
|
||||
sighupWatchers.addAutoFile(af)
|
||||
return
|
||||
}
|
||||
|
||||
func (af *AutoFile) Close() error {
|
||||
af.ticker.Stop()
|
||||
err := af.closeFile()
|
||||
sighupWatchers.removeAutoFile(af)
|
||||
return err
|
||||
}
|
||||
|
||||
func (af *AutoFile) processTicks() {
|
||||
for {
|
||||
_, ok := <-af.ticker.C
|
||||
if !ok {
|
||||
return // Done.
|
||||
}
|
||||
af.closeFile()
|
||||
}
|
||||
}
|
||||
|
||||
func (af *AutoFile) closeFile() (err error) {
|
||||
af.mtx.Lock()
|
||||
defer af.mtx.Unlock()
|
||||
|
||||
file := af.file
|
||||
if file == nil {
|
||||
return nil
|
||||
}
|
||||
af.file = nil
|
||||
return file.Close()
|
||||
}
|
||||
|
||||
func (af *AutoFile) Write(b []byte) (n int, err error) {
|
||||
af.mtx.Lock()
|
||||
defer af.mtx.Unlock()
|
||||
|
||||
if af.file == nil {
|
||||
if err = af.openFile(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
n, err = af.file.Write(b)
|
||||
return
|
||||
}
|
||||
|
||||
func (af *AutoFile) Sync() error {
|
||||
af.mtx.Lock()
|
||||
defer af.mtx.Unlock()
|
||||
|
||||
return af.file.Sync()
|
||||
}
|
||||
|
||||
func (af *AutoFile) openFile() error {
|
||||
file, err := os.OpenFile(af.Path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
af.file = file
|
||||
return nil
|
||||
}
|
||||
|
||||
func (af *AutoFile) Size() (int64, error) {
|
||||
af.mtx.Lock()
|
||||
defer af.mtx.Unlock()
|
||||
|
||||
if af.file == nil {
|
||||
err := af.openFile()
|
||||
if err != nil {
|
||||
if err == os.ErrNotExist {
|
||||
return 0, nil
|
||||
} else {
|
||||
return -1, err
|
||||
}
|
||||
}
|
||||
}
|
||||
stat, err := af.file.Stat()
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
return stat.Size(), nil
|
||||
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
package autofile
|
||||
|
||||
import (
|
||||
. "github.com/tendermint/go-common"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestSIGHUP(t *testing.T) {
|
||||
|
||||
// First, create an AutoFile writing to a tempfile dir
|
||||
file, name := Tempfile("sighup_test")
|
||||
err := file.Close()
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating tempfile: %v", err)
|
||||
}
|
||||
// Here is the actual AutoFile
|
||||
af, err := OpenAutoFile(name)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating autofile: %v", err)
|
||||
}
|
||||
|
||||
// Write to the file.
|
||||
_, err = af.Write([]byte("Line 1\n"))
|
||||
if err != nil {
|
||||
t.Fatalf("Error writing to autofile: %v", err)
|
||||
}
|
||||
_, err = af.Write([]byte("Line 2\n"))
|
||||
if err != nil {
|
||||
t.Fatalf("Error writing to autofile: %v", err)
|
||||
}
|
||||
|
||||
// Move the file over
|
||||
err = os.Rename(name, name+"_old")
|
||||
if err != nil {
|
||||
t.Fatalf("Error moving autofile: %v", err)
|
||||
}
|
||||
|
||||
// Send SIGHUP to self.
|
||||
oldSighupCounter := atomic.LoadInt32(&sighupCounter)
|
||||
syscall.Kill(syscall.Getpid(), syscall.SIGHUP)
|
||||
|
||||
// Wait a bit... signals are not handled synchronously.
|
||||
for atomic.LoadInt32(&sighupCounter) == oldSighupCounter {
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
}
|
||||
|
||||
// Write more to the file.
|
||||
_, err = af.Write([]byte("Line 3\n"))
|
||||
if err != nil {
|
||||
t.Fatalf("Error writing to autofile: %v", err)
|
||||
}
|
||||
_, err = af.Write([]byte("Line 4\n"))
|
||||
if err != nil {
|
||||
t.Fatalf("Error writing to autofile: %v", err)
|
||||
}
|
||||
err = af.Close()
|
||||
if err != nil {
|
||||
t.Fatalf("Error closing autofile")
|
||||
}
|
||||
|
||||
// Both files should exist
|
||||
if body := MustReadFile(name + "_old"); string(body) != "Line 1\nLine 2\n" {
|
||||
t.Errorf("Unexpected body %s", body)
|
||||
}
|
||||
if body := MustReadFile(name); string(body) != "Line 3\nLine 4\n" {
|
||||
t.Errorf("Unexpected body %s", body)
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,663 @@
|
|||
package autofile
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
. "github.com/tendermint/go-common"
|
||||
)
|
||||
|
||||
/*
|
||||
You can open a Group to keep restrictions on an AutoFile, like
|
||||
the maximum size of each chunk, and/or the total amount of bytes
|
||||
stored in the group.
|
||||
|
||||
The first file to be written in the Group.Dir is the head file.
|
||||
|
||||
Dir/
|
||||
- <HeadPath>
|
||||
|
||||
Once the Head file reaches the size limit, it will be rotated.
|
||||
|
||||
Dir/
|
||||
- <HeadPath>.000 // First rolled file
|
||||
- <HeadPath> // New head path, starts empty.
|
||||
// The implicit index is 001.
|
||||
|
||||
As more files are written, the index numbers grow...
|
||||
|
||||
Dir/
|
||||
- <HeadPath>.000 // First rolled file
|
||||
- <HeadPath>.001 // Second rolled file
|
||||
- ...
|
||||
- <HeadPath> // New head path
|
||||
|
||||
The Group can also be used to binary-search for some line,
|
||||
assuming that marker lines are written occasionally.
|
||||
*/
|
||||
|
||||
const groupCheckDuration = 5000 * time.Millisecond
|
||||
const defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB
|
||||
const defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB
|
||||
const maxFilesToRemove = 4 // needs to be greater than 1
|
||||
|
||||
type Group struct {
|
||||
BaseService
|
||||
|
||||
ID string
|
||||
Head *AutoFile // The head AutoFile to write to
|
||||
headBuf *bufio.Writer
|
||||
Dir string // Directory that contains .Head
|
||||
ticker *time.Ticker
|
||||
mtx sync.Mutex
|
||||
headSizeLimit int64
|
||||
totalSizeLimit int64
|
||||
minIndex int // Includes head
|
||||
maxIndex int // Includes head, where Head will move to
|
||||
|
||||
// TODO: When we start deleting files, we need to start tracking GroupReaders
|
||||
// and their dependencies.
|
||||
}
|
||||
|
||||
func OpenGroup(headPath string) (g *Group, err error) {
|
||||
|
||||
dir := path.Dir(headPath)
|
||||
head, err := OpenAutoFile(headPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
g = &Group{
|
||||
ID: "group:" + head.ID,
|
||||
Head: head,
|
||||
headBuf: bufio.NewWriterSize(head, 4096*10),
|
||||
Dir: dir,
|
||||
ticker: time.NewTicker(groupCheckDuration),
|
||||
headSizeLimit: defaultHeadSizeLimit,
|
||||
totalSizeLimit: defaultTotalSizeLimit,
|
||||
minIndex: 0,
|
||||
maxIndex: 0,
|
||||
}
|
||||
g.BaseService = *NewBaseService(nil, "Group", g)
|
||||
|
||||
gInfo := g.readGroupInfo()
|
||||
g.minIndex = gInfo.MinIndex
|
||||
g.maxIndex = gInfo.MaxIndex
|
||||
return
|
||||
}
|
||||
|
||||
func (g *Group) OnStart() error {
|
||||
g.BaseService.OnStart()
|
||||
go g.processTicks()
|
||||
return nil
|
||||
}
|
||||
|
||||
// NOTE: g.Head must be closed separately
|
||||
func (g *Group) OnStop() {
|
||||
g.BaseService.OnStop()
|
||||
g.ticker.Stop()
|
||||
return
|
||||
}
|
||||
|
||||
func (g *Group) SetHeadSizeLimit(limit int64) {
|
||||
g.mtx.Lock()
|
||||
g.headSizeLimit = limit
|
||||
g.mtx.Unlock()
|
||||
}
|
||||
|
||||
func (g *Group) HeadSizeLimit() int64 {
|
||||
g.mtx.Lock()
|
||||
defer g.mtx.Unlock()
|
||||
return g.headSizeLimit
|
||||
}
|
||||
|
||||
func (g *Group) SetTotalSizeLimit(limit int64) {
|
||||
g.mtx.Lock()
|
||||
g.totalSizeLimit = limit
|
||||
g.mtx.Unlock()
|
||||
}
|
||||
|
||||
func (g *Group) TotalSizeLimit() int64 {
|
||||
g.mtx.Lock()
|
||||
defer g.mtx.Unlock()
|
||||
return g.totalSizeLimit
|
||||
}
|
||||
|
||||
func (g *Group) MaxIndex() int {
|
||||
g.mtx.Lock()
|
||||
defer g.mtx.Unlock()
|
||||
return g.maxIndex
|
||||
}
|
||||
|
||||
// Auto appends "\n"
|
||||
// NOTE: Writes are buffered so they don't write synchronously
|
||||
// TODO: Make it halt if space is unavailable
|
||||
func (g *Group) WriteLine(line string) error {
|
||||
g.mtx.Lock()
|
||||
defer g.mtx.Unlock()
|
||||
_, err := g.headBuf.Write([]byte(line + "\n"))
|
||||
return err
|
||||
}
|
||||
|
||||
func (g *Group) Flush() error {
|
||||
g.mtx.Lock()
|
||||
defer g.mtx.Unlock()
|
||||
return g.headBuf.Flush()
|
||||
}
|
||||
|
||||
func (g *Group) processTicks() {
|
||||
for {
|
||||
_, ok := <-g.ticker.C
|
||||
if !ok {
|
||||
return // Done.
|
||||
}
|
||||
g.checkHeadSizeLimit()
|
||||
g.checkTotalSizeLimit()
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: for testing
|
||||
func (g *Group) stopTicker() {
|
||||
g.ticker.Stop()
|
||||
}
|
||||
|
||||
// NOTE: this function is called manually in tests.
|
||||
func (g *Group) checkHeadSizeLimit() {
|
||||
limit := g.HeadSizeLimit()
|
||||
if limit == 0 {
|
||||
return
|
||||
}
|
||||
size, err := g.Head.Size()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if size >= limit {
|
||||
g.RotateFile()
|
||||
}
|
||||
}
|
||||
|
||||
func (g *Group) checkTotalSizeLimit() {
|
||||
limit := g.TotalSizeLimit()
|
||||
if limit == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
gInfo := g.readGroupInfo()
|
||||
totalSize := gInfo.TotalSize
|
||||
for i := 0; i < maxFilesToRemove; i++ {
|
||||
index := gInfo.MinIndex + i
|
||||
if totalSize < limit {
|
||||
return
|
||||
}
|
||||
if index == gInfo.MaxIndex {
|
||||
// Special degenerate case, just do nothing.
|
||||
log.Println("WARNING: Group's head " + g.Head.Path + "may grow without bound")
|
||||
return
|
||||
}
|
||||
pathToRemove := filePathForIndex(g.Head.Path, index, gInfo.MaxIndex)
|
||||
fileInfo, err := os.Stat(pathToRemove)
|
||||
if err != nil {
|
||||
log.Println("WARNING: Failed to fetch info for file @" + pathToRemove)
|
||||
continue
|
||||
}
|
||||
err = os.Remove(pathToRemove)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return
|
||||
}
|
||||
totalSize -= fileInfo.Size()
|
||||
}
|
||||
}
|
||||
|
||||
func (g *Group) RotateFile() {
|
||||
g.mtx.Lock()
|
||||
defer g.mtx.Unlock()
|
||||
|
||||
dstPath := filePathForIndex(g.Head.Path, g.maxIndex, g.maxIndex+1)
|
||||
err := os.Rename(g.Head.Path, dstPath)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = g.Head.closeFile()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
g.maxIndex += 1
|
||||
}
|
||||
|
||||
// NOTE: if error, returns no GroupReader.
|
||||
// CONTRACT: Caller must close the returned GroupReader
|
||||
func (g *Group) NewReader(index int) (*GroupReader, error) {
|
||||
r := newGroupReader(g)
|
||||
err := r.SetIndex(index)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
return r, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Returns -1 if line comes after, 0 if found, 1 if line comes before.
|
||||
type SearchFunc func(line string) (int, error)
|
||||
|
||||
// Searches for the right file in Group, then returns a GroupReader to start
|
||||
// streaming lines.
|
||||
// Returns true if an exact match was found, otherwise returns the next greater
|
||||
// line that starts with prefix.
|
||||
// CONTRACT: Caller must close the returned GroupReader
|
||||
func (g *Group) Search(prefix string, cmp SearchFunc) (*GroupReader, bool, error) {
|
||||
g.mtx.Lock()
|
||||
minIndex, maxIndex := g.minIndex, g.maxIndex
|
||||
g.mtx.Unlock()
|
||||
// Now minIndex/maxIndex may change meanwhile,
|
||||
// but it shouldn't be a big deal
|
||||
// (maybe we'll want to limit scanUntil though)
|
||||
|
||||
for {
|
||||
curIndex := (minIndex + maxIndex + 1) / 2
|
||||
|
||||
// Base case, when there's only 1 choice left.
|
||||
if minIndex == maxIndex {
|
||||
r, err := g.NewReader(maxIndex)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
match, err := scanUntil(r, prefix, cmp)
|
||||
if err != nil {
|
||||
r.Close()
|
||||
return nil, false, err
|
||||
} else {
|
||||
return r, match, err
|
||||
}
|
||||
}
|
||||
|
||||
// Read starting roughly at the middle file,
|
||||
// until we find line that has prefix.
|
||||
r, err := g.NewReader(curIndex)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
foundIndex, line, err := scanNext(r, prefix)
|
||||
r.Close()
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
// Compare this line to our search query.
|
||||
val, err := cmp(line)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
if val < 0 {
|
||||
// Line will come later
|
||||
minIndex = foundIndex
|
||||
} else if val == 0 {
|
||||
// Stroke of luck, found the line
|
||||
r, err := g.NewReader(foundIndex)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
match, err := scanUntil(r, prefix, cmp)
|
||||
if !match {
|
||||
panic("Expected match to be true")
|
||||
}
|
||||
if err != nil {
|
||||
r.Close()
|
||||
return nil, false, err
|
||||
} else {
|
||||
return r, true, err
|
||||
}
|
||||
} else {
|
||||
// We passed it
|
||||
maxIndex = curIndex - 1
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Scans and returns the first line that starts with 'prefix'
|
||||
// Consumes line and returns it.
|
||||
func scanNext(r *GroupReader, prefix string) (int, string, error) {
|
||||
for {
|
||||
line, err := r.ReadLine()
|
||||
if err != nil {
|
||||
return 0, "", err
|
||||
}
|
||||
if !strings.HasPrefix(line, prefix) {
|
||||
continue
|
||||
}
|
||||
index := r.CurIndex()
|
||||
return index, line, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Returns true iff an exact match was found.
|
||||
// Pushes line, does not consume it.
|
||||
func scanUntil(r *GroupReader, prefix string, cmp SearchFunc) (bool, error) {
|
||||
for {
|
||||
line, err := r.ReadLine()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !strings.HasPrefix(line, prefix) {
|
||||
continue
|
||||
}
|
||||
val, err := cmp(line)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if val < 0 {
|
||||
continue
|
||||
} else if val == 0 {
|
||||
r.PushLine(line)
|
||||
return true, nil
|
||||
} else {
|
||||
r.PushLine(line)
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Searches backwards for the last line in Group with prefix.
|
||||
// Scans each file forward until the end to find the last match.
|
||||
func (g *Group) FindLast(prefix string) (match string, found bool, err error) {
|
||||
g.mtx.Lock()
|
||||
minIndex, maxIndex := g.minIndex, g.maxIndex
|
||||
g.mtx.Unlock()
|
||||
|
||||
r, err := g.NewReader(maxIndex)
|
||||
if err != nil {
|
||||
return "", false, err
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
// Open files from the back and read
|
||||
GROUP_LOOP:
|
||||
for i := maxIndex; i >= minIndex; i-- {
|
||||
err := r.SetIndex(i)
|
||||
if err != nil {
|
||||
return "", false, err
|
||||
}
|
||||
// Scan each line and test whether line matches
|
||||
for {
|
||||
line, err := r.ReadLine()
|
||||
if err == io.EOF {
|
||||
if found {
|
||||
return match, found, nil
|
||||
} else {
|
||||
continue GROUP_LOOP
|
||||
}
|
||||
} else if err != nil {
|
||||
return "", false, err
|
||||
}
|
||||
if strings.HasPrefix(line, prefix) {
|
||||
match = line
|
||||
found = true
|
||||
}
|
||||
if r.CurIndex() > i {
|
||||
if found {
|
||||
return match, found, nil
|
||||
} else {
|
||||
continue GROUP_LOOP
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
type GroupInfo struct {
|
||||
MinIndex int
|
||||
MaxIndex int
|
||||
TotalSize int64
|
||||
HeadSize int64
|
||||
}
|
||||
|
||||
// Returns info after scanning all files in g.Head's dir
|
||||
func (g *Group) ReadGroupInfo() GroupInfo {
|
||||
g.mtx.Lock()
|
||||
defer g.mtx.Unlock()
|
||||
return g.readGroupInfo()
|
||||
}
|
||||
|
||||
// Index includes the head.
|
||||
// CONTRACT: caller should have called g.mtx.Lock
|
||||
func (g *Group) readGroupInfo() GroupInfo {
|
||||
groupDir := filepath.Dir(g.Head.Path)
|
||||
headBase := filepath.Base(g.Head.Path)
|
||||
var minIndex, maxIndex int = -1, -1
|
||||
var totalSize, headSize int64 = 0, 0
|
||||
|
||||
dir, err := os.Open(groupDir)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer dir.Close()
|
||||
fiz, err := dir.Readdir(0)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// For each file in the directory, filter by pattern
|
||||
for _, fileInfo := range fiz {
|
||||
if fileInfo.Name() == headBase {
|
||||
fileSize := fileInfo.Size()
|
||||
totalSize += fileSize
|
||||
headSize = fileSize
|
||||
continue
|
||||
} else if strings.HasPrefix(fileInfo.Name(), headBase) {
|
||||
fileSize := fileInfo.Size()
|
||||
totalSize += fileSize
|
||||
indexedFilePattern := regexp.MustCompile(`^.+\.([0-9]{3,})$`)
|
||||
submatch := indexedFilePattern.FindSubmatch([]byte(fileInfo.Name()))
|
||||
if len(submatch) != 0 {
|
||||
// Matches
|
||||
fileIndex, err := strconv.Atoi(string(submatch[1]))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if maxIndex < fileIndex {
|
||||
maxIndex = fileIndex
|
||||
}
|
||||
if minIndex == -1 || fileIndex < minIndex {
|
||||
minIndex = fileIndex
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Now account for the head.
|
||||
if minIndex == -1 {
|
||||
// If there were no numbered files,
|
||||
// then the head is index 0.
|
||||
minIndex, maxIndex = 0, 0
|
||||
} else {
|
||||
// Otherwise, the head file is 1 greater
|
||||
maxIndex += 1
|
||||
}
|
||||
return GroupInfo{minIndex, maxIndex, totalSize, headSize}
|
||||
}
|
||||
|
||||
func filePathForIndex(headPath string, index int, maxIndex int) string {
|
||||
if index == maxIndex {
|
||||
return headPath
|
||||
} else {
|
||||
return fmt.Sprintf("%v.%03d", headPath, index)
|
||||
}
|
||||
}
|
||||
|
||||
//--------------------------------------------------------------------------------
|
||||
|
||||
type GroupReader struct {
|
||||
*Group
|
||||
mtx sync.Mutex
|
||||
curIndex int
|
||||
curFile *os.File
|
||||
curReader *bufio.Reader
|
||||
curLine []byte
|
||||
}
|
||||
|
||||
func newGroupReader(g *Group) *GroupReader {
|
||||
return &GroupReader{
|
||||
Group: g,
|
||||
curIndex: 0,
|
||||
curFile: nil,
|
||||
curReader: nil,
|
||||
curLine: nil,
|
||||
}
|
||||
}
|
||||
|
||||
func (gr *GroupReader) Close() error {
|
||||
gr.mtx.Lock()
|
||||
defer gr.mtx.Unlock()
|
||||
|
||||
if gr.curReader != nil {
|
||||
err := gr.curFile.Close()
|
||||
gr.curIndex = 0
|
||||
gr.curReader = nil
|
||||
gr.curFile = nil
|
||||
gr.curLine = nil
|
||||
return err
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Reads a line (without delimiter)
|
||||
// just return io.EOF if no new lines found.
|
||||
func (gr *GroupReader) ReadLine() (string, error) {
|
||||
gr.mtx.Lock()
|
||||
defer gr.mtx.Unlock()
|
||||
|
||||
// From PushLine
|
||||
if gr.curLine != nil {
|
||||
line := string(gr.curLine)
|
||||
gr.curLine = nil
|
||||
return line, nil
|
||||
}
|
||||
|
||||
// Open file if not open yet
|
||||
if gr.curReader == nil {
|
||||
err := gr.openFile(gr.curIndex)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
|
||||
// Iterate over files until line is found
|
||||
var linePrefix string
|
||||
for {
|
||||
bytesRead, err := gr.curReader.ReadBytes('\n')
|
||||
if err == io.EOF {
|
||||
// Open the next file
|
||||
err := gr.openFile(gr.curIndex + 1)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if len(bytesRead) > 0 && bytesRead[len(bytesRead)-1] == byte('\n') {
|
||||
return linePrefix + string(bytesRead[:len(bytesRead)-1]), nil
|
||||
} else {
|
||||
linePrefix += string(bytesRead)
|
||||
continue
|
||||
}
|
||||
} else if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return linePrefix + string(bytesRead[:len(bytesRead)-1]), nil
|
||||
}
|
||||
}
|
||||
|
||||
// IF index > gr.Group.maxIndex, returns io.EOF
|
||||
// CONTRACT: caller should hold gr.mtx
|
||||
func (gr *GroupReader) openFile(index int) error {
|
||||
|
||||
// Lock on Group to ensure that head doesn't move in the meanwhile.
|
||||
gr.Group.mtx.Lock()
|
||||
defer gr.Group.mtx.Unlock()
|
||||
|
||||
if index > gr.Group.maxIndex {
|
||||
return io.EOF
|
||||
}
|
||||
|
||||
curFilePath := filePathForIndex(gr.Head.Path, index, gr.Group.maxIndex)
|
||||
curFile, err := os.Open(curFilePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
curReader := bufio.NewReader(curFile)
|
||||
|
||||
// Update gr.cur*
|
||||
if gr.curFile != nil {
|
||||
gr.curFile.Close() // TODO return error?
|
||||
}
|
||||
gr.curIndex = index
|
||||
gr.curFile = curFile
|
||||
gr.curReader = curReader
|
||||
gr.curLine = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (gr *GroupReader) PushLine(line string) {
|
||||
gr.mtx.Lock()
|
||||
defer gr.mtx.Unlock()
|
||||
|
||||
if gr.curLine == nil {
|
||||
gr.curLine = []byte(line)
|
||||
} else {
|
||||
panic("PushLine failed, already have line")
|
||||
}
|
||||
}
|
||||
|
||||
// Cursor's file index.
|
||||
func (gr *GroupReader) CurIndex() int {
|
||||
gr.mtx.Lock()
|
||||
defer gr.mtx.Unlock()
|
||||
return gr.curIndex
|
||||
}
|
||||
|
||||
func (gr *GroupReader) SetIndex(index int) error {
|
||||
gr.mtx.Lock()
|
||||
defer gr.mtx.Unlock()
|
||||
return gr.openFile(index)
|
||||
}
|
||||
|
||||
//--------------------------------------------------------------------------------
|
||||
|
||||
// A simple SearchFunc that assumes that the marker is of form
|
||||
// <prefix><number>.
|
||||
// For example, if prefix is '#HEIGHT:', the markers of expected to be of the form:
|
||||
//
|
||||
// #HEIGHT:1
|
||||
// ...
|
||||
// #HEIGHT:2
|
||||
// ...
|
||||
func MakeSimpleSearchFunc(prefix string, target int) SearchFunc {
|
||||
return func(line string) (int, error) {
|
||||
if !strings.HasPrefix(line, prefix) {
|
||||
return -1, errors.New(Fmt("Marker line did not have prefix: %v", prefix))
|
||||
}
|
||||
i, err := strconv.Atoi(line[len(prefix):])
|
||||
if err != nil {
|
||||
return -1, errors.New(Fmt("Failed to parse marker line: %v", err.Error()))
|
||||
}
|
||||
if target < i {
|
||||
return 1, nil
|
||||
} else if target == i {
|
||||
return 0, nil
|
||||
} else {
|
||||
return -1, nil
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,405 @@
|
|||
package autofile
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
. "github.com/tendermint/go-common"
|
||||
)
|
||||
|
||||
// NOTE: Returned group has ticker stopped
|
||||
func createTestGroup(t *testing.T, headSizeLimit int64) *Group {
|
||||
testID := RandStr(12)
|
||||
testDir := "_test_" + testID
|
||||
err := EnsureDir(testDir, 0700)
|
||||
if err != nil {
|
||||
t.Fatal("Error creating dir", err)
|
||||
}
|
||||
headPath := testDir + "/myfile"
|
||||
g, err := OpenGroup(headPath)
|
||||
if err != nil {
|
||||
t.Fatal("Error opening Group", err)
|
||||
}
|
||||
g.SetHeadSizeLimit(headSizeLimit)
|
||||
g.stopTicker()
|
||||
|
||||
if g == nil {
|
||||
t.Fatal("Failed to create Group")
|
||||
}
|
||||
return g
|
||||
}
|
||||
|
||||
func destroyTestGroup(t *testing.T, g *Group) {
|
||||
err := os.RemoveAll(g.Dir)
|
||||
if err != nil {
|
||||
t.Fatal("Error removing test Group directory", err)
|
||||
}
|
||||
}
|
||||
|
||||
func assertGroupInfo(t *testing.T, gInfo GroupInfo, minIndex, maxIndex int, totalSize, headSize int64) {
|
||||
if gInfo.MinIndex != minIndex {
|
||||
t.Errorf("GroupInfo MinIndex expected %v, got %v", minIndex, gInfo.MinIndex)
|
||||
}
|
||||
if gInfo.MaxIndex != maxIndex {
|
||||
t.Errorf("GroupInfo MaxIndex expected %v, got %v", maxIndex, gInfo.MaxIndex)
|
||||
}
|
||||
if gInfo.TotalSize != totalSize {
|
||||
t.Errorf("GroupInfo TotalSize expected %v, got %v", totalSize, gInfo.TotalSize)
|
||||
}
|
||||
if gInfo.HeadSize != headSize {
|
||||
t.Errorf("GroupInfo HeadSize expected %v, got %v", headSize, gInfo.HeadSize)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCheckHeadSizeLimit(t *testing.T) {
|
||||
g := createTestGroup(t, 1000*1000)
|
||||
|
||||
// At first, there are no files.
|
||||
assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 0, 0)
|
||||
|
||||
// Write 1000 bytes 999 times.
|
||||
for i := 0; i < 999; i++ {
|
||||
err := g.WriteLine(RandStr(999))
|
||||
if err != nil {
|
||||
t.Fatal("Error appending to head", err)
|
||||
}
|
||||
}
|
||||
g.Flush()
|
||||
assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 999000, 999000)
|
||||
|
||||
// Even calling checkHeadSizeLimit manually won't rotate it.
|
||||
g.checkHeadSizeLimit()
|
||||
assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 999000, 999000)
|
||||
|
||||
// Write 1000 more bytes.
|
||||
err := g.WriteLine(RandStr(999))
|
||||
if err != nil {
|
||||
t.Fatal("Error appending to head", err)
|
||||
}
|
||||
g.Flush()
|
||||
|
||||
// Calling checkHeadSizeLimit this time rolls it.
|
||||
g.checkHeadSizeLimit()
|
||||
assertGroupInfo(t, g.ReadGroupInfo(), 0, 1, 1000000, 0)
|
||||
|
||||
// Write 1000 more bytes.
|
||||
err = g.WriteLine(RandStr(999))
|
||||
if err != nil {
|
||||
t.Fatal("Error appending to head", err)
|
||||
}
|
||||
g.Flush()
|
||||
|
||||
// Calling checkHeadSizeLimit does nothing.
|
||||
g.checkHeadSizeLimit()
|
||||
assertGroupInfo(t, g.ReadGroupInfo(), 0, 1, 1001000, 1000)
|
||||
|
||||
// Write 1000 bytes 999 times.
|
||||
for i := 0; i < 999; i++ {
|
||||
err := g.WriteLine(RandStr(999))
|
||||
if err != nil {
|
||||
t.Fatal("Error appending to head", err)
|
||||
}
|
||||
}
|
||||
g.Flush()
|
||||
assertGroupInfo(t, g.ReadGroupInfo(), 0, 1, 2000000, 1000000)
|
||||
|
||||
// Calling checkHeadSizeLimit rolls it again.
|
||||
g.checkHeadSizeLimit()
|
||||
assertGroupInfo(t, g.ReadGroupInfo(), 0, 2, 2000000, 0)
|
||||
|
||||
// Write 1000 more bytes.
|
||||
_, err = g.Head.Write([]byte(RandStr(999) + "\n"))
|
||||
if err != nil {
|
||||
t.Fatal("Error appending to head", err)
|
||||
}
|
||||
g.Flush()
|
||||
assertGroupInfo(t, g.ReadGroupInfo(), 0, 2, 2001000, 1000)
|
||||
|
||||
// Calling checkHeadSizeLimit does nothing.
|
||||
g.checkHeadSizeLimit()
|
||||
assertGroupInfo(t, g.ReadGroupInfo(), 0, 2, 2001000, 1000)
|
||||
|
||||
// Cleanup
|
||||
destroyTestGroup(t, g)
|
||||
}
|
||||
|
||||
func TestSearch(t *testing.T) {
|
||||
g := createTestGroup(t, 10*1000)
|
||||
|
||||
// Create some files in the group that have several INFO lines in them.
|
||||
// Try to put the INFO lines in various spots.
|
||||
for i := 0; i < 100; i++ {
|
||||
// The random junk at the end ensures that this INFO linen
|
||||
// is equally likely to show up at the end.
|
||||
_, err := g.Head.Write([]byte(Fmt("INFO %v %v\n", i, RandStr(123))))
|
||||
if err != nil {
|
||||
t.Error("Failed to write to head")
|
||||
}
|
||||
g.checkHeadSizeLimit()
|
||||
for j := 0; j < 10; j++ {
|
||||
_, err := g.Head.Write([]byte(RandStr(123) + "\n"))
|
||||
if err != nil {
|
||||
t.Error("Failed to write to head")
|
||||
}
|
||||
g.checkHeadSizeLimit()
|
||||
}
|
||||
}
|
||||
|
||||
// Create a search func that searches for line
|
||||
makeSearchFunc := func(target int) SearchFunc {
|
||||
return func(line string) (int, error) {
|
||||
parts := strings.Split(line, " ")
|
||||
if len(parts) != 3 {
|
||||
return -1, errors.New("Line did not have 3 parts")
|
||||
}
|
||||
i, err := strconv.Atoi(parts[1])
|
||||
if err != nil {
|
||||
return -1, errors.New("Failed to parse INFO: " + err.Error())
|
||||
}
|
||||
if target < i {
|
||||
return 1, nil
|
||||
} else if target == i {
|
||||
return 0, nil
|
||||
} else {
|
||||
return -1, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Now search for each number
|
||||
for i := 0; i < 100; i++ {
|
||||
t.Log("Testing for i", i)
|
||||
gr, match, err := g.Search("INFO", makeSearchFunc(i))
|
||||
if err != nil {
|
||||
t.Fatal("Failed to search for line:", err)
|
||||
}
|
||||
if !match {
|
||||
t.Error("Expected Search to return exact match")
|
||||
}
|
||||
line, err := gr.ReadLine()
|
||||
if err != nil {
|
||||
t.Fatal("Failed to read line after search", err)
|
||||
}
|
||||
if !strings.HasPrefix(line, Fmt("INFO %v ", i)) {
|
||||
t.Fatal("Failed to get correct line")
|
||||
}
|
||||
// Make sure we can continue to read from there.
|
||||
cur := i + 1
|
||||
for {
|
||||
line, err := gr.ReadLine()
|
||||
if err == io.EOF {
|
||||
if cur == 99+1 {
|
||||
// OK!
|
||||
break
|
||||
} else {
|
||||
t.Fatal("Got EOF after the wrong INFO #")
|
||||
}
|
||||
} else if err != nil {
|
||||
t.Fatal("Error reading line", err)
|
||||
}
|
||||
if !strings.HasPrefix(line, "INFO ") {
|
||||
continue
|
||||
}
|
||||
if !strings.HasPrefix(line, Fmt("INFO %v ", cur)) {
|
||||
t.Fatalf("Unexpected INFO #. Expected %v got:\n%v", cur, line)
|
||||
}
|
||||
cur += 1
|
||||
}
|
||||
gr.Close()
|
||||
}
|
||||
|
||||
// Now search for something that is too small.
|
||||
// We should get the first available line.
|
||||
{
|
||||
gr, match, err := g.Search("INFO", makeSearchFunc(-999))
|
||||
if err != nil {
|
||||
t.Fatal("Failed to search for line:", err)
|
||||
}
|
||||
if match {
|
||||
t.Error("Expected Search to not return exact match")
|
||||
}
|
||||
line, err := gr.ReadLine()
|
||||
if err != nil {
|
||||
t.Fatal("Failed to read line after search", err)
|
||||
}
|
||||
if !strings.HasPrefix(line, "INFO 0 ") {
|
||||
t.Error("Failed to fetch correct line, which is the earliest INFO")
|
||||
}
|
||||
err = gr.Close()
|
||||
if err != nil {
|
||||
t.Error("Failed to close GroupReader", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Now search for something that is too large.
|
||||
// We should get an EOF error.
|
||||
{
|
||||
gr, _, err := g.Search("INFO", makeSearchFunc(999))
|
||||
if err != io.EOF {
|
||||
t.Error("Expected to get an EOF error")
|
||||
}
|
||||
if gr != nil {
|
||||
t.Error("Expected to get nil GroupReader")
|
||||
}
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
destroyTestGroup(t, g)
|
||||
}
|
||||
|
||||
func TestRotateFile(t *testing.T) {
|
||||
g := createTestGroup(t, 0)
|
||||
g.WriteLine("Line 1")
|
||||
g.WriteLine("Line 2")
|
||||
g.WriteLine("Line 3")
|
||||
g.Flush()
|
||||
g.RotateFile()
|
||||
g.WriteLine("Line 4")
|
||||
g.WriteLine("Line 5")
|
||||
g.WriteLine("Line 6")
|
||||
g.Flush()
|
||||
|
||||
// Read g.Head.Path+"000"
|
||||
body1, err := ioutil.ReadFile(g.Head.Path + ".000")
|
||||
if err != nil {
|
||||
t.Error("Failed to read first rolled file")
|
||||
}
|
||||
if string(body1) != "Line 1\nLine 2\nLine 3\n" {
|
||||
t.Errorf("Got unexpected contents: [%v]", string(body1))
|
||||
}
|
||||
|
||||
// Read g.Head.Path
|
||||
body2, err := ioutil.ReadFile(g.Head.Path)
|
||||
if err != nil {
|
||||
t.Error("Failed to read first rolled file")
|
||||
}
|
||||
if string(body2) != "Line 4\nLine 5\nLine 6\n" {
|
||||
t.Errorf("Got unexpected contents: [%v]", string(body2))
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
destroyTestGroup(t, g)
|
||||
}
|
||||
|
||||
func TestFindLast1(t *testing.T) {
|
||||
g := createTestGroup(t, 0)
|
||||
|
||||
g.WriteLine("Line 1")
|
||||
g.WriteLine("Line 2")
|
||||
g.WriteLine("# a")
|
||||
g.WriteLine("Line 3")
|
||||
g.Flush()
|
||||
g.RotateFile()
|
||||
g.WriteLine("Line 4")
|
||||
g.WriteLine("Line 5")
|
||||
g.WriteLine("Line 6")
|
||||
g.WriteLine("# b")
|
||||
g.Flush()
|
||||
|
||||
match, found, err := g.FindLast("#")
|
||||
if err != nil {
|
||||
t.Error("Unexpected error", err)
|
||||
}
|
||||
if !found {
|
||||
t.Error("Expected found=True")
|
||||
}
|
||||
if match != "# b" {
|
||||
t.Errorf("Unexpected match: [%v]", match)
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
destroyTestGroup(t, g)
|
||||
}
|
||||
|
||||
func TestFindLast2(t *testing.T) {
|
||||
g := createTestGroup(t, 0)
|
||||
|
||||
g.WriteLine("Line 1")
|
||||
g.WriteLine("Line 2")
|
||||
g.WriteLine("Line 3")
|
||||
g.Flush()
|
||||
g.RotateFile()
|
||||
g.WriteLine("# a")
|
||||
g.WriteLine("Line 4")
|
||||
g.WriteLine("Line 5")
|
||||
g.WriteLine("# b")
|
||||
g.WriteLine("Line 6")
|
||||
g.Flush()
|
||||
|
||||
match, found, err := g.FindLast("#")
|
||||
if err != nil {
|
||||
t.Error("Unexpected error", err)
|
||||
}
|
||||
if !found {
|
||||
t.Error("Expected found=True")
|
||||
}
|
||||
if match != "# b" {
|
||||
t.Errorf("Unexpected match: [%v]", match)
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
destroyTestGroup(t, g)
|
||||
}
|
||||
|
||||
func TestFindLast3(t *testing.T) {
|
||||
g := createTestGroup(t, 0)
|
||||
|
||||
g.WriteLine("Line 1")
|
||||
g.WriteLine("# a")
|
||||
g.WriteLine("Line 2")
|
||||
g.WriteLine("# b")
|
||||
g.WriteLine("Line 3")
|
||||
g.Flush()
|
||||
g.RotateFile()
|
||||
g.WriteLine("Line 4")
|
||||
g.WriteLine("Line 5")
|
||||
g.WriteLine("Line 6")
|
||||
g.Flush()
|
||||
|
||||
match, found, err := g.FindLast("#")
|
||||
if err != nil {
|
||||
t.Error("Unexpected error", err)
|
||||
}
|
||||
if !found {
|
||||
t.Error("Expected found=True")
|
||||
}
|
||||
if match != "# b" {
|
||||
t.Errorf("Unexpected match: [%v]", match)
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
destroyTestGroup(t, g)
|
||||
}
|
||||
|
||||
func TestFindLast4(t *testing.T) {
|
||||
g := createTestGroup(t, 0)
|
||||
|
||||
g.WriteLine("Line 1")
|
||||
g.WriteLine("Line 2")
|
||||
g.WriteLine("Line 3")
|
||||
g.Flush()
|
||||
g.RotateFile()
|
||||
g.WriteLine("Line 4")
|
||||
g.WriteLine("Line 5")
|
||||
g.WriteLine("Line 6")
|
||||
g.Flush()
|
||||
|
||||
match, found, err := g.FindLast("#")
|
||||
if err != nil {
|
||||
t.Error("Unexpected error", err)
|
||||
}
|
||||
if found {
|
||||
t.Error("Expected found=False")
|
||||
}
|
||||
if match != "" {
|
||||
t.Errorf("Unexpected match: [%v]", match)
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
destroyTestGroup(t, g)
|
||||
}
|
|
@ -0,0 +1,63 @@
|
|||
package autofile
|
||||
|
||||
import (
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
func init() {
|
||||
initSighupWatcher()
|
||||
}
|
||||
|
||||
var sighupWatchers *SighupWatcher
|
||||
var sighupCounter int32 // For testing
|
||||
|
||||
func initSighupWatcher() {
|
||||
sighupWatchers = newSighupWatcher()
|
||||
|
||||
c := make(chan os.Signal, 1)
|
||||
signal.Notify(c, syscall.SIGHUP)
|
||||
|
||||
go func() {
|
||||
for _ = range c {
|
||||
sighupWatchers.closeAll()
|
||||
atomic.AddInt32(&sighupCounter, 1)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Watchces for SIGHUP events and notifies registered AutoFiles
|
||||
type SighupWatcher struct {
|
||||
mtx sync.Mutex
|
||||
autoFiles map[string]*AutoFile
|
||||
}
|
||||
|
||||
func newSighupWatcher() *SighupWatcher {
|
||||
return &SighupWatcher{
|
||||
autoFiles: make(map[string]*AutoFile, 10),
|
||||
}
|
||||
}
|
||||
|
||||
func (w *SighupWatcher) addAutoFile(af *AutoFile) {
|
||||
w.mtx.Lock()
|
||||
w.autoFiles[af.ID] = af
|
||||
w.mtx.Unlock()
|
||||
}
|
||||
|
||||
// If AutoFile isn't registered or was already removed, does nothing.
|
||||
func (w *SighupWatcher) removeAutoFile(af *AutoFile) {
|
||||
w.mtx.Lock()
|
||||
delete(w.autoFiles, af.ID)
|
||||
w.mtx.Unlock()
|
||||
}
|
||||
|
||||
func (w *SighupWatcher) closeAll() {
|
||||
w.mtx.Lock()
|
||||
for _, af := range w.autoFiles {
|
||||
af.closeFile()
|
||||
}
|
||||
w.mtx.Unlock()
|
||||
}
|
Loading…
Reference in New Issue