diff --git a/consensus/replay.go b/consensus/replay.go index aa06d468..e929847c 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -130,7 +130,7 @@ func (cs *ConsensusState) catchupReplay(height int) error { } else if len(msgBytes) == 1 && msgBytes[0] == '\n' { continue } - // the first msg is (usually) the NewHeight event, so we can ignore it + // the first msg is the NewHeight event (if we're not at the beginning), so we can ignore it if !beginning && i == 1 { continue } diff --git a/consensus/state.go b/consensus/state.go index eb710960..6fdda408 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -319,6 +319,9 @@ func (cs *ConsensusState) startRoutines(maxSteps int) { func (cs *ConsensusState) OnStop() { cs.QuitService.OnStop() + if cs.wal != nil { + cs.wal.Wait() + } } // Open file to log all consensus messages and timeouts for deterministic accountability @@ -616,6 +619,19 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { // go to the next step cs.handleTimeout(ti, rs) case <-cs.Quit: + + // drain the internalMsgQueue in case we eg. signed a proposal but it didn't hit the wal + FLUSH: + for { + select { + case mi = <-cs.internalMsgQueue: + cs.wal.Save(mi) + cs.handleMsg(mi, rs) + default: + break FLUSH + } + } + // close wal now that we're done writing to it if cs.wal != nil { cs.wal.Close() diff --git a/consensus/wal.go b/consensus/wal.go index 46f7a9df..00536f9d 100644 --- a/consensus/wal.go +++ b/consensus/wal.go @@ -37,6 +37,8 @@ var _ = wire.RegisterInterface( type WAL struct { fp *os.File exists bool // if the file already existed (restarted process) + + done chan struct{} } func NewWAL(file string) (*WAL, error) { @@ -51,6 +53,7 @@ func NewWAL(file string) (*WAL, error) { return &WAL{ fp: fp, exists: walExists, + done: make(chan struct{}), }, nil } @@ -72,6 +75,11 @@ func (wal *WAL) Close() { if wal != nil { wal.fp.Close() } + wal.done <- struct{}{} +} + +func (wal *WAL) Wait() { + <-wal.done } func (wal *WAL) SeekFromEnd(found func([]byte) bool) (nLines int, err error) {