tendermint/consensus/wal.go

138 lines
3.2 KiB
Go

package consensus
import (
"bufio"
"os"
"time"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/types"
)
//--------------------------------------------------------
// types and functions for savings consensus messages
type ConsensusLogMessage struct {
Time time.Time `json:"time"`
Msg ConsensusLogMessageInterface `json:"msg"`
}
type ConsensusLogMessageInterface interface{}
var _ = wire.RegisterInterface(
struct{ ConsensusLogMessageInterface }{},
wire.ConcreteType{types.EventDataRoundState{}, 0x01},
wire.ConcreteType{msgInfo{}, 0x02},
wire.ConcreteType{timeoutInfo{}, 0x03},
)
//--------------------------------------------------------
// Simple write-ahead logger
// Write ahead logger writes msgs to disk before they are processed.
// Can be used for crash-recovery and deterministic replay
// TODO: currently the wal is overwritten during replay catchup
// give it a mode so it's either reading or appending - must read to end to start appending again
type WAL struct {
fp *os.File
exists bool // if the file already existed (restarted process)
done chan struct{}
light bool // ignore block parts
}
func NewWAL(file string, light bool) (*WAL, error) {
var walExists bool
if _, err := os.Stat(file); err == nil {
walExists = true
}
fp, err := os.OpenFile(file, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
if err != nil {
return nil, err
}
return &WAL{
fp: fp,
exists: walExists,
done: make(chan struct{}),
light: light,
}, nil
}
// called in newStep and for each pass in receiveRoutine
func (wal *WAL) Save(clm ConsensusLogMessageInterface) {
if wal != nil {
if wal.light {
// in light mode we only write new steps, timeouts, and our own votes (no proposals, block parts)
if mi, ok := clm.(msgInfo); ok {
_ = mi
if mi.PeerKey != "" {
return
}
}
}
var n int
var err error
wire.WriteJSON(ConsensusLogMessage{time.Now(), clm}, wal.fp, &n, &err)
wire.WriteTo([]byte("\n"), wal.fp, &n, &err) // one message per line
if err != nil {
PanicQ(Fmt("Error writing msg to consensus wal. Error: %v \n\nMessage: %v", err, clm))
}
}
}
// Must not be called concurrently with a write.
func (wal *WAL) Close() {
if wal != nil {
wal.fp.Close()
}
wal.done <- struct{}{}
}
func (wal *WAL) Wait() {
<-wal.done
}
func (wal *WAL) SeekFromEnd(found func([]byte) bool) (nLines int, err error) {
var current int64
// start at the end
current, err = wal.fp.Seek(0, 2)
if err != nil {
return
}
// backup until we find the the right line
// current is how far we are from the beginning
for {
current -= 1
if current < 0 {
wal.fp.Seek(0, 0) // back to beginning
return
}
// backup one and read a new byte
if _, err = wal.fp.Seek(current, 0); err != nil {
return
}
b := make([]byte, 1)
if _, err = wal.fp.Read(b); err != nil {
return
}
if b[0] == '\n' || len(b) == 0 {
nLines += 1
// read a full line
reader := bufio.NewReader(wal.fp)
lineBytes, _ := reader.ReadBytes('\n')
if len(lineBytes) == 0 {
continue
}
if found(lineBytes) {
wal.fp.Seek(0, 1) // (?)
wal.fp.Seek(current, 0)
return
}
}
}
}