From 082a02e6d1102d319418790284fe98ba4e021198 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sun, 20 May 2018 14:40:01 -0400 Subject: [PATCH 1/2] consensus: only fsync wal after internal msgs --- .github/ISSUE_TEMPLATE | 4 ---- CHANGELOG.md | 1 + consensus/replay_test.go | 12 ++++++++---- consensus/state.go | 10 +++++----- consensus/wal.go | 28 ++++++++++++++++++++++------ consensus/wal_generator.go | 8 ++++++-- 6 files changed, 42 insertions(+), 21 deletions(-) diff --git a/.github/ISSUE_TEMPLATE b/.github/ISSUE_TEMPLATE index e714a6ff..c9c1f6a0 100644 --- a/.github/ISSUE_TEMPLATE +++ b/.github/ISSUE_TEMPLATE @@ -19,10 +19,6 @@ in a case of bug. **ABCI app** (name for built-in, URL for self-written if it's publicly available): - -**Merkleeyes version** (use `git rev-parse --verify HEAD`, skip if you don't use it): - - **Environment**: - **OS** (e.g. from /etc/os-release): - **Install tools**: diff --git a/CHANGELOG.md b/CHANGELOG.md index 507b8319..4d2462c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ FEATURES IMPROVEMENTS - [docs] Lots of updates +- [consensus] Only Fsync() the WAL before executing msgs from ourselves BUG FIXES diff --git a/consensus/replay_test.go b/consensus/replay_test.go index ff0eee1c..84b1e118 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -218,15 +218,15 @@ func (e ReachedHeightToStopError) Error() string { return fmt.Sprintf("reached height to stop %d", e.height) } -// Save simulate WAL's crashing by sending an error to the panicCh and then +// Write simulate WAL's crashing by sending an error to the panicCh and then // exiting the cs.receiveRoutine. -func (w *crashingWAL) Save(m WALMessage) { +func (w *crashingWAL) Write(m WALMessage) { if endMsg, ok := m.(EndHeightMessage); ok { if endMsg.Height == w.heightToStop { w.panicCh <- ReachedHeightToStopError{endMsg.Height} runtime.Goexit() } else { - w.next.Save(m) + w.next.Write(m) } return } @@ -238,10 +238,14 @@ func (w *crashingWAL) Save(m WALMessage) { runtime.Goexit() } else { w.msgIndex++ - w.next.Save(m) + w.next.Write(m) } } +func (w *crashingWAL) WriteSync(m WALMessage) { + w.Write(m) +} + func (w *crashingWAL) Group() *auto.Group { return w.next.Group() } func (w *crashingWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) { return w.next.SearchForEndHeight(height, options) diff --git a/consensus/state.go b/consensus/state.go index e4477a9b..b5b94368 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -504,7 +504,7 @@ func (cs *ConsensusState) updateToState(state sm.State) { func (cs *ConsensusState) newStep() { rs := cs.RoundStateEvent() - cs.wal.Save(rs) + cs.wal.Write(rs) cs.nSteps++ // newStep is called by updateToStep in NewConsensusState before the eventBus is set! if cs.eventBus != nil { @@ -542,16 +542,16 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { case height := <-cs.mempool.TxsAvailable(): cs.handleTxsAvailable(height) case mi = <-cs.peerMsgQueue: - cs.wal.Save(mi) + cs.wal.Write(mi) // handles proposals, block parts, votes // may generate internal events (votes, complete proposals, 2/3 majorities) cs.handleMsg(mi) case mi = <-cs.internalMsgQueue: - cs.wal.Save(mi) + cs.wal.WriteSync(mi) // NOTE: fsync // handles proposals, block parts, votes cs.handleMsg(mi) case ti := <-cs.timeoutTicker.Chan(): // tockChan: - cs.wal.Save(ti) + cs.wal.Write(ti) // if the timeout is relevant to the rs // go to the next step cs.handleTimeout(ti, rs) @@ -1241,7 +1241,7 @@ func (cs *ConsensusState) finalizeCommit(height int64) { // Either way, the ConsensusState should not be resumed until we // successfully call ApplyBlock (ie. later here, or in Handshake after // restart). - cs.wal.Save(EndHeightMessage{height}) + cs.wal.WriteSync(EndHeightMessage{height}) // NOTE: fsync fail.Fail() // XXX diff --git a/consensus/wal.go b/consensus/wal.go index d22c3ea1..e9e05fc9 100644 --- a/consensus/wal.go +++ b/consensus/wal.go @@ -50,7 +50,8 @@ func RegisterWALMessages(cdc *amino.Codec) { // WAL is an interface for any write-ahead logger. type WAL interface { - Save(WALMessage) + Write(WALMessage) + WriteSync(WALMessage) Group() *auto.Group SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) @@ -98,7 +99,7 @@ func (wal *baseWAL) OnStart() error { if err != nil { return err } else if size == 0 { - wal.Save(EndHeightMessage{0}) + wal.WriteSync(EndHeightMessage{0}) } err = wal.group.Start() return err @@ -109,8 +110,22 @@ func (wal *baseWAL) OnStop() { wal.group.Stop() } -// called in newStep and for each pass in receiveRoutine -func (wal *baseWAL) Save(msg WALMessage) { +// called in newStep and for each receive on the +// peerMsgQueue and the timoutTicker +func (wal *baseWAL) Write(msg WALMessage) { + if wal == nil { + return + } + + // Write the wal message + if err := wal.enc.Encode(&TimedWALMessage{time.Now(), msg}); err != nil { + cmn.PanicQ(cmn.Fmt("Error writing msg to consensus wal: %v \n\nMessage: %v", err, msg)) + } +} + +// called when we receive a msg from ourselves +// so that we write to disk before sending signed messages +func (wal *baseWAL) WriteSync(msg WALMessage) { if wal == nil { return } @@ -297,8 +312,9 @@ func (dec *WALDecoder) Decode() (*TimedWALMessage, error) { type nilWAL struct{} -func (nilWAL) Save(m WALMessage) {} -func (nilWAL) Group() *auto.Group { return nil } +func (nilWAL) Write(m WALMessage) {} +func (nilWAL) WriteSync(m WALMessage) {} +func (nilWAL) Group() *auto.Group { return nil } func (nilWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) { return nil, false, nil } diff --git a/consensus/wal_generator.go b/consensus/wal_generator.go index 18ff614c..dc364df0 100644 --- a/consensus/wal_generator.go +++ b/consensus/wal_generator.go @@ -83,7 +83,7 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) { numBlocksWritten := make(chan struct{}) wal := newByteBufferWAL(logger, NewWALEncoder(wr), int64(numBlocks), numBlocksWritten) // see wal.go#103 - wal.Save(EndHeightMessage{0}) + wal.Write(EndHeightMessage{0}) consensusState.wal = wal if err := consensusState.Start(); err != nil { @@ -166,7 +166,7 @@ func newByteBufferWAL(logger log.Logger, enc *WALEncoder, nBlocks int64, signalS // Save writes message to the internal buffer except when heightToStop is // reached, in which case it will signal the caller via signalWhenStopsTo and // skip writing. -func (w *byteBufferWAL) Save(m WALMessage) { +func (w *byteBufferWAL) Write(m WALMessage) { if w.stopped { w.logger.Debug("WAL already stopped. Not writing message", "msg", m) return @@ -189,6 +189,10 @@ func (w *byteBufferWAL) Save(m WALMessage) { } } +func (w *byteBufferWAL) WriteSync(m WALMessage) { + w.Write(m) +} + func (w *byteBufferWAL) Group() *auto.Group { panic("not implemented") } From ee4eb59355718f5a0c9cf3eb91ef08ab1f98f1e1 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sun, 20 May 2018 16:44:08 -0400 Subject: [PATCH 2/2] update comments --- consensus/wal.go | 21 +++++++++------------ node/node_test.go | 2 +- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/consensus/wal.go b/consensus/wal.go index e9e05fc9..0db0dc50 100644 --- a/consensus/wal.go +++ b/consensus/wal.go @@ -110,8 +110,9 @@ func (wal *baseWAL) OnStop() { wal.group.Stop() } -// called in newStep and for each receive on the -// peerMsgQueue and the timoutTicker +// Write is called in newStep and for each receive on the +// peerMsgQueue and the timoutTicker. +// NOTE: does not call fsync() func (wal *baseWAL) Write(msg WALMessage) { if wal == nil { return @@ -119,25 +120,21 @@ func (wal *baseWAL) Write(msg WALMessage) { // Write the wal message if err := wal.enc.Encode(&TimedWALMessage{time.Now(), msg}); err != nil { - cmn.PanicQ(cmn.Fmt("Error writing msg to consensus wal: %v \n\nMessage: %v", err, msg)) + panic(cmn.Fmt("Error writing msg to consensus wal: %v \n\nMessage: %v", err, msg)) } } -// called when we receive a msg from ourselves -// so that we write to disk before sending signed messages +// WriteSync is called when we receive a msg from ourselves +// so that we write to disk before sending signed messages. +// NOTE: calls fsync() func (wal *baseWAL) WriteSync(msg WALMessage) { if wal == nil { return } - // Write the wal message - if err := wal.enc.Encode(&TimedWALMessage{time.Now(), msg}); err != nil { - cmn.PanicQ(cmn.Fmt("Error writing msg to consensus wal: %v \n\nMessage: %v", err, msg)) - } - - // TODO: only flush when necessary + wal.Write(msg) if err := wal.group.Flush(); err != nil { - cmn.PanicQ(cmn.Fmt("Error flushing consensus wal buf to file. Error: %v \n", err)) + panic(cmn.Fmt("Error flushing consensus wal buf to file. Error: %v \n", err)) } } diff --git a/node/node_test.go b/node/node_test.go index ca539382..cdabdbb3 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -31,7 +31,7 @@ func TestNodeStartStop(t *testing.T) { assert.NoError(t, err) select { case <-blockCh: - case <-time.After(5 * time.Second): + case <-time.After(10 * time.Second): t.Fatal("timed out waiting for the node to produce a block") }