consensus: only fsync wal after internal msgs

This commit is contained in:
Ethan Buchman 2018-05-20 14:40:01 -04:00
parent 2c40966e46
commit 082a02e6d1
6 changed files with 42 additions and 21 deletions

View File

@ -19,10 +19,6 @@ in a case of bug.
**ABCI app** (name for built-in, URL for self-written if it's publicly available): **ABCI app** (name for built-in, URL for self-written if it's publicly available):
**Merkleeyes version** (use `git rev-parse --verify HEAD`, skip if you don't use it):
**Environment**: **Environment**:
- **OS** (e.g. from /etc/os-release): - **OS** (e.g. from /etc/os-release):
- **Install tools**: - **Install tools**:

View File

@ -20,6 +20,7 @@ FEATURES
IMPROVEMENTS IMPROVEMENTS
- [docs] Lots of updates - [docs] Lots of updates
- [consensus] Only Fsync() the WAL before executing msgs from ourselves
BUG FIXES BUG FIXES

View File

@ -218,15 +218,15 @@ func (e ReachedHeightToStopError) Error() string {
return fmt.Sprintf("reached height to stop %d", e.height) return fmt.Sprintf("reached height to stop %d", e.height)
} }
// Save simulate WAL's crashing by sending an error to the panicCh and then // Write simulate WAL's crashing by sending an error to the panicCh and then
// exiting the cs.receiveRoutine. // exiting the cs.receiveRoutine.
func (w *crashingWAL) Save(m WALMessage) { func (w *crashingWAL) Write(m WALMessage) {
if endMsg, ok := m.(EndHeightMessage); ok { if endMsg, ok := m.(EndHeightMessage); ok {
if endMsg.Height == w.heightToStop { if endMsg.Height == w.heightToStop {
w.panicCh <- ReachedHeightToStopError{endMsg.Height} w.panicCh <- ReachedHeightToStopError{endMsg.Height}
runtime.Goexit() runtime.Goexit()
} else { } else {
w.next.Save(m) w.next.Write(m)
} }
return return
} }
@ -238,10 +238,14 @@ func (w *crashingWAL) Save(m WALMessage) {
runtime.Goexit() runtime.Goexit()
} else { } else {
w.msgIndex++ w.msgIndex++
w.next.Save(m) w.next.Write(m)
} }
} }
func (w *crashingWAL) WriteSync(m WALMessage) {
w.Write(m)
}
func (w *crashingWAL) Group() *auto.Group { return w.next.Group() } func (w *crashingWAL) Group() *auto.Group { return w.next.Group() }
func (w *crashingWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) { func (w *crashingWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) {
return w.next.SearchForEndHeight(height, options) return w.next.SearchForEndHeight(height, options)

View File

@ -504,7 +504,7 @@ func (cs *ConsensusState) updateToState(state sm.State) {
func (cs *ConsensusState) newStep() { func (cs *ConsensusState) newStep() {
rs := cs.RoundStateEvent() rs := cs.RoundStateEvent()
cs.wal.Save(rs) cs.wal.Write(rs)
cs.nSteps++ cs.nSteps++
// newStep is called by updateToStep in NewConsensusState before the eventBus is set! // newStep is called by updateToStep in NewConsensusState before the eventBus is set!
if cs.eventBus != nil { if cs.eventBus != nil {
@ -542,16 +542,16 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) {
case height := <-cs.mempool.TxsAvailable(): case height := <-cs.mempool.TxsAvailable():
cs.handleTxsAvailable(height) cs.handleTxsAvailable(height)
case mi = <-cs.peerMsgQueue: case mi = <-cs.peerMsgQueue:
cs.wal.Save(mi) cs.wal.Write(mi)
// handles proposals, block parts, votes // handles proposals, block parts, votes
// may generate internal events (votes, complete proposals, 2/3 majorities) // may generate internal events (votes, complete proposals, 2/3 majorities)
cs.handleMsg(mi) cs.handleMsg(mi)
case mi = <-cs.internalMsgQueue: case mi = <-cs.internalMsgQueue:
cs.wal.Save(mi) cs.wal.WriteSync(mi) // NOTE: fsync
// handles proposals, block parts, votes // handles proposals, block parts, votes
cs.handleMsg(mi) cs.handleMsg(mi)
case ti := <-cs.timeoutTicker.Chan(): // tockChan: case ti := <-cs.timeoutTicker.Chan(): // tockChan:
cs.wal.Save(ti) cs.wal.Write(ti)
// if the timeout is relevant to the rs // if the timeout is relevant to the rs
// go to the next step // go to the next step
cs.handleTimeout(ti, rs) cs.handleTimeout(ti, rs)
@ -1241,7 +1241,7 @@ func (cs *ConsensusState) finalizeCommit(height int64) {
// Either way, the ConsensusState should not be resumed until we // Either way, the ConsensusState should not be resumed until we
// successfully call ApplyBlock (ie. later here, or in Handshake after // successfully call ApplyBlock (ie. later here, or in Handshake after
// restart). // restart).
cs.wal.Save(EndHeightMessage{height}) cs.wal.WriteSync(EndHeightMessage{height}) // NOTE: fsync
fail.Fail() // XXX fail.Fail() // XXX

View File

@ -50,7 +50,8 @@ func RegisterWALMessages(cdc *amino.Codec) {
// WAL is an interface for any write-ahead logger. // WAL is an interface for any write-ahead logger.
type WAL interface { type WAL interface {
Save(WALMessage) Write(WALMessage)
WriteSync(WALMessage)
Group() *auto.Group Group() *auto.Group
SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error)
@ -98,7 +99,7 @@ func (wal *baseWAL) OnStart() error {
if err != nil { if err != nil {
return err return err
} else if size == 0 { } else if size == 0 {
wal.Save(EndHeightMessage{0}) wal.WriteSync(EndHeightMessage{0})
} }
err = wal.group.Start() err = wal.group.Start()
return err return err
@ -109,8 +110,22 @@ func (wal *baseWAL) OnStop() {
wal.group.Stop() wal.group.Stop()
} }
// called in newStep and for each pass in receiveRoutine // called in newStep and for each receive on the
func (wal *baseWAL) Save(msg WALMessage) { // peerMsgQueue and the timoutTicker
func (wal *baseWAL) Write(msg WALMessage) {
if wal == nil {
return
}
// Write the wal message
if err := wal.enc.Encode(&TimedWALMessage{time.Now(), msg}); err != nil {
cmn.PanicQ(cmn.Fmt("Error writing msg to consensus wal: %v \n\nMessage: %v", err, msg))
}
}
// called when we receive a msg from ourselves
// so that we write to disk before sending signed messages
func (wal *baseWAL) WriteSync(msg WALMessage) {
if wal == nil { if wal == nil {
return return
} }
@ -297,8 +312,9 @@ func (dec *WALDecoder) Decode() (*TimedWALMessage, error) {
type nilWAL struct{} type nilWAL struct{}
func (nilWAL) Save(m WALMessage) {} func (nilWAL) Write(m WALMessage) {}
func (nilWAL) Group() *auto.Group { return nil } func (nilWAL) WriteSync(m WALMessage) {}
func (nilWAL) Group() *auto.Group { return nil }
func (nilWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) { func (nilWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) {
return nil, false, nil return nil, false, nil
} }

View File

@ -83,7 +83,7 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) {
numBlocksWritten := make(chan struct{}) numBlocksWritten := make(chan struct{})
wal := newByteBufferWAL(logger, NewWALEncoder(wr), int64(numBlocks), numBlocksWritten) wal := newByteBufferWAL(logger, NewWALEncoder(wr), int64(numBlocks), numBlocksWritten)
// see wal.go#103 // see wal.go#103
wal.Save(EndHeightMessage{0}) wal.Write(EndHeightMessage{0})
consensusState.wal = wal consensusState.wal = wal
if err := consensusState.Start(); err != nil { if err := consensusState.Start(); err != nil {
@ -166,7 +166,7 @@ func newByteBufferWAL(logger log.Logger, enc *WALEncoder, nBlocks int64, signalS
// Save writes message to the internal buffer except when heightToStop is // Save writes message to the internal buffer except when heightToStop is
// reached, in which case it will signal the caller via signalWhenStopsTo and // reached, in which case it will signal the caller via signalWhenStopsTo and
// skip writing. // skip writing.
func (w *byteBufferWAL) Save(m WALMessage) { func (w *byteBufferWAL) Write(m WALMessage) {
if w.stopped { if w.stopped {
w.logger.Debug("WAL already stopped. Not writing message", "msg", m) w.logger.Debug("WAL already stopped. Not writing message", "msg", m)
return return
@ -189,6 +189,10 @@ func (w *byteBufferWAL) Save(m WALMessage) {
} }
} }
func (w *byteBufferWAL) WriteSync(m WALMessage) {
w.Write(m)
}
func (w *byteBufferWAL) Group() *auto.Group { func (w *byteBufferWAL) Group() *auto.Group {
panic("not implemented") panic("not implemented")
} }