drain internalMsgQueue and wait for cswal on quit
This commit is contained in:
parent
c9ec9cf00e
commit
69d906f7dd
|
@ -130,7 +130,7 @@ func (cs *ConsensusState) catchupReplay(height int) error {
|
||||||
} else if len(msgBytes) == 1 && msgBytes[0] == '\n' {
|
} else if len(msgBytes) == 1 && msgBytes[0] == '\n' {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// the first msg is (usually) the NewHeight event, so we can ignore it
|
// the first msg is the NewHeight event (if we're not at the beginning), so we can ignore it
|
||||||
if !beginning && i == 1 {
|
if !beginning && i == 1 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
|
@ -319,6 +319,9 @@ func (cs *ConsensusState) startRoutines(maxSteps int) {
|
||||||
|
|
||||||
func (cs *ConsensusState) OnStop() {
|
func (cs *ConsensusState) OnStop() {
|
||||||
cs.QuitService.OnStop()
|
cs.QuitService.OnStop()
|
||||||
|
if cs.wal != nil {
|
||||||
|
cs.wal.Wait()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open file to log all consensus messages and timeouts for deterministic accountability
|
// Open file to log all consensus messages and timeouts for deterministic accountability
|
||||||
|
@ -616,6 +619,19 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) {
|
||||||
// go to the next step
|
// go to the next step
|
||||||
cs.handleTimeout(ti, rs)
|
cs.handleTimeout(ti, rs)
|
||||||
case <-cs.Quit:
|
case <-cs.Quit:
|
||||||
|
|
||||||
|
// drain the internalMsgQueue in case we eg. signed a proposal but it didn't hit the wal
|
||||||
|
FLUSH:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case mi = <-cs.internalMsgQueue:
|
||||||
|
cs.wal.Save(mi)
|
||||||
|
cs.handleMsg(mi, rs)
|
||||||
|
default:
|
||||||
|
break FLUSH
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// close wal now that we're done writing to it
|
// close wal now that we're done writing to it
|
||||||
if cs.wal != nil {
|
if cs.wal != nil {
|
||||||
cs.wal.Close()
|
cs.wal.Close()
|
||||||
|
|
|
@ -37,6 +37,8 @@ var _ = wire.RegisterInterface(
|
||||||
type WAL struct {
|
type WAL struct {
|
||||||
fp *os.File
|
fp *os.File
|
||||||
exists bool // if the file already existed (restarted process)
|
exists bool // if the file already existed (restarted process)
|
||||||
|
|
||||||
|
done chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWAL(file string) (*WAL, error) {
|
func NewWAL(file string) (*WAL, error) {
|
||||||
|
@ -51,6 +53,7 @@ func NewWAL(file string) (*WAL, error) {
|
||||||
return &WAL{
|
return &WAL{
|
||||||
fp: fp,
|
fp: fp,
|
||||||
exists: walExists,
|
exists: walExists,
|
||||||
|
done: make(chan struct{}),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -72,6 +75,11 @@ func (wal *WAL) Close() {
|
||||||
if wal != nil {
|
if wal != nil {
|
||||||
wal.fp.Close()
|
wal.fp.Close()
|
||||||
}
|
}
|
||||||
|
wal.done <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wal *WAL) Wait() {
|
||||||
|
<-wal.done
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wal *WAL) SeekFromEnd(found func([]byte) bool) (nLines int, err error) {
|
func (wal *WAL) SeekFromEnd(found func([]byte) bool) (nLines int, err error) {
|
||||||
|
|
Loading…
Reference in New Issue