diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index e85a6ed6..010aea40 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -26,3 +26,4 @@ BUG FIXES: - [common] Safely handle cases where atomic write files already exist [#2109](https://github.com/tendermint/tendermint/issues/2109) - [privval] fix a deadline for accepting new connections in socket private validator. +- [node] Fully exit when CTRL-C is pressed even if consensus state panics [#2072] diff --git a/consensus/state.go b/consensus/state.go index f66a872e..6ffe6ef6 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -553,9 +553,30 @@ func (cs *ConsensusState) newStep() { // Updates (state transitions) happen on timeouts, complete proposals, and 2/3 majorities. // ConsensusState must be locked before any internal state is updated. func (cs *ConsensusState) receiveRoutine(maxSteps int) { + onExit := func(cs *ConsensusState) { + // NOTE: the internalMsgQueue may have signed messages from our + // priv_val that haven't hit the WAL, but its ok because + // priv_val tracks LastSig + + // close wal now that we're done writing to it + cs.wal.Stop() + cs.wal.Wait() + + close(cs.done) + } + defer func() { if r := recover(); r != nil { cs.Logger.Error("CONSENSUS FAILURE!!!", "err", r, "stack", string(debug.Stack())) + // stop gracefully + // + // NOTE: We most probably shouldn't be running any further when there is + // some unexpected panic. Some unknown error happened, and so we don't + // know if that will result in the validator signing an invalid thing. It + // might be worthwhile to explore a mechanism for manual resuming via + // some console or secure RPC system, but for now, halting the chain upon + // unexpected consensus bugs sounds like the better option. + onExit(cs) } }() @@ -588,16 +609,7 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { // go to the next step cs.handleTimeout(ti, rs) case <-cs.Quit(): - - // NOTE: the internalMsgQueue may have signed messages from our - // priv_val that haven't hit the WAL, but its ok because - // priv_val tracks LastSig - - // close wal now that we're done writing to it - cs.wal.Stop() - cs.wal.Wait() - - close(cs.done) + onExit(cs) return } } diff --git a/libs/autofile/autofile.go b/libs/autofile/autofile.go index 313da678..2f1bb4fd 100644 --- a/libs/autofile/autofile.go +++ b/libs/autofile/autofile.go @@ -35,18 +35,20 @@ const autoFileOpenDuration = 1000 * time.Millisecond // Automatically closes and re-opens file for writing. // This is useful for using a log file with the logrotate tool. type AutoFile struct { - ID string - Path string - ticker *time.Ticker - mtx sync.Mutex - file *os.File + ID string + Path string + ticker *time.Ticker + tickerStopped chan struct{} // closed when ticker is stopped + mtx sync.Mutex + file *os.File } func OpenAutoFile(path string) (af *AutoFile, err error) { af = &AutoFile{ - ID: cmn.RandStr(12) + ":" + path, - Path: path, - ticker: time.NewTicker(autoFileOpenDuration), + ID: cmn.RandStr(12) + ":" + path, + Path: path, + ticker: time.NewTicker(autoFileOpenDuration), + tickerStopped: make(chan struct{}), } if err = af.openFile(); err != nil { return @@ -58,18 +60,18 @@ func OpenAutoFile(path string) (af *AutoFile, err error) { func (af *AutoFile) Close() error { af.ticker.Stop() + close(af.tickerStopped) err := af.closeFile() sighupWatchers.removeAutoFile(af) return err } func (af *AutoFile) processTicks() { - for { - _, ok := <-af.ticker.C - if !ok { - return // Done. - } + select { + case <-af.ticker.C: af.closeFile() + case <-af.tickerStopped: + return } } diff --git a/libs/autofile/group.go b/libs/autofile/group.go index b4368ed9..e747f04d 100644 --- a/libs/autofile/group.go +++ b/libs/autofile/group.go @@ -85,7 +85,6 @@ func OpenGroup(headPath string) (g *Group, err error) { Head: head, headBuf: bufio.NewWriterSize(head, 4096*10), Dir: dir, - ticker: time.NewTicker(groupCheckDuration), headSizeLimit: defaultHeadSizeLimit, totalSizeLimit: defaultTotalSizeLimit, minIndex: 0, @@ -102,6 +101,7 @@ func OpenGroup(headPath string) (g *Group, err error) { // OnStart implements Service by starting the goroutine that checks file and // group limits. func (g *Group) OnStart() error { + g.ticker = time.NewTicker(groupCheckDuration) go g.processTicks() return nil } @@ -199,21 +199,15 @@ func (g *Group) Flush() error { } func (g *Group) processTicks() { - for { - _, ok := <-g.ticker.C - if !ok { - return // Done. - } + select { + case <-g.ticker.C: g.checkHeadSizeLimit() g.checkTotalSizeLimit() + case <-g.Quit(): + return } } -// NOTE: for testing -func (g *Group) stopTicker() { - g.ticker.Stop() -} - // NOTE: this function is called manually in tests. func (g *Group) checkHeadSizeLimit() { limit := g.HeadSizeLimit() diff --git a/libs/autofile/group_test.go b/libs/autofile/group_test.go index c7e8725c..d87bdba8 100644 --- a/libs/autofile/group_test.go +++ b/libs/autofile/group_test.go @@ -16,23 +16,25 @@ import ( cmn "github.com/tendermint/tendermint/libs/common" ) -// NOTE: Returned group has ticker stopped -func createTestGroup(t *testing.T, headSizeLimit int64) *Group { +func createTestGroupWithHeadSizeLimit(t *testing.T, headSizeLimit int64) *Group { testID := cmn.RandStr(12) testDir := "_test_" + testID err := cmn.EnsureDir(testDir, 0700) require.NoError(t, err, "Error creating dir") + headPath := testDir + "/myfile" g, err := OpenGroup(headPath) require.NoError(t, err, "Error opening Group") - g.SetHeadSizeLimit(headSizeLimit) - g.stopTicker() require.NotEqual(t, nil, g, "Failed to create Group") + + g.SetHeadSizeLimit(headSizeLimit) + return g } func destroyTestGroup(t *testing.T, g *Group) { g.Close() + err := os.RemoveAll(g.Dir) require.NoError(t, err, "Error removing test Group directory") } @@ -45,7 +47,7 @@ func assertGroupInfo(t *testing.T, gInfo GroupInfo, minIndex, maxIndex int, tota } func TestCheckHeadSizeLimit(t *testing.T) { - g := createTestGroup(t, 1000*1000) + g := createTestGroupWithHeadSizeLimit(t, 1000*1000) // At first, there are no files. assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 0, 0) @@ -107,7 +109,7 @@ func TestCheckHeadSizeLimit(t *testing.T) { } func TestSearch(t *testing.T) { - g := createTestGroup(t, 10*1000) + g := createTestGroupWithHeadSizeLimit(t, 10*1000) // Create some files in the group that have several INFO lines in them. // Try to put the INFO lines in various spots. @@ -208,7 +210,7 @@ func TestSearch(t *testing.T) { } func TestRotateFile(t *testing.T) { - g := createTestGroup(t, 0) + g := createTestGroupWithHeadSizeLimit(t, 0) g.WriteLine("Line 1") g.WriteLine("Line 2") g.WriteLine("Line 3") @@ -238,7 +240,7 @@ func TestRotateFile(t *testing.T) { } func TestFindLast1(t *testing.T) { - g := createTestGroup(t, 0) + g := createTestGroupWithHeadSizeLimit(t, 0) g.WriteLine("Line 1") g.WriteLine("Line 2") @@ -262,7 +264,7 @@ func TestFindLast1(t *testing.T) { } func TestFindLast2(t *testing.T) { - g := createTestGroup(t, 0) + g := createTestGroupWithHeadSizeLimit(t, 0) g.WriteLine("Line 1") g.WriteLine("Line 2") @@ -286,7 +288,7 @@ func TestFindLast2(t *testing.T) { } func TestFindLast3(t *testing.T) { - g := createTestGroup(t, 0) + g := createTestGroupWithHeadSizeLimit(t, 0) g.WriteLine("Line 1") g.WriteLine("# a") @@ -310,7 +312,7 @@ func TestFindLast3(t *testing.T) { } func TestFindLast4(t *testing.T) { - g := createTestGroup(t, 0) + g := createTestGroupWithHeadSizeLimit(t, 0) g.WriteLine("Line 1") g.WriteLine("Line 2") @@ -332,7 +334,7 @@ func TestFindLast4(t *testing.T) { } func TestWrite(t *testing.T) { - g := createTestGroup(t, 0) + g := createTestGroupWithHeadSizeLimit(t, 0) written := []byte("Medusa") g.Write(written) @@ -353,7 +355,7 @@ func TestWrite(t *testing.T) { // test that Read reads the required amount of bytes from all the files in the // group and returns no error if n == size of the given slice. func TestGroupReaderRead(t *testing.T) { - g := createTestGroup(t, 0) + g := createTestGroupWithHeadSizeLimit(t, 0) professor := []byte("Professor Monster") g.Write(professor) @@ -382,7 +384,7 @@ func TestGroupReaderRead(t *testing.T) { // test that Read returns an error if number of bytes read < size of // the given slice. Subsequent call should return 0, io.EOF. func TestGroupReaderRead2(t *testing.T) { - g := createTestGroup(t, 0) + g := createTestGroupWithHeadSizeLimit(t, 0) professor := []byte("Professor Monster") g.Write(professor) @@ -413,7 +415,7 @@ func TestGroupReaderRead2(t *testing.T) { } func TestMinIndex(t *testing.T) { - g := createTestGroup(t, 0) + g := createTestGroupWithHeadSizeLimit(t, 0) assert.Zero(t, g.MinIndex(), "MinIndex should be zero at the beginning") @@ -422,7 +424,7 @@ func TestMinIndex(t *testing.T) { } func TestMaxIndex(t *testing.T) { - g := createTestGroup(t, 0) + g := createTestGroupWithHeadSizeLimit(t, 0) assert.Zero(t, g.MaxIndex(), "MaxIndex should be zero at the beginning") diff --git a/libs/autofile/sighup_watcher.go b/libs/autofile/sighup_watcher.go index 56fbd4d8..f72f12fc 100644 --- a/libs/autofile/sighup_watcher.go +++ b/libs/autofile/sighup_watcher.go @@ -18,13 +18,19 @@ var sighupCounter int32 // For testing func initSighupWatcher() { sighupWatchers = newSighupWatcher() - c := make(chan os.Signal, 1) - signal.Notify(c, syscall.SIGHUP) + hup := make(chan os.Signal, 1) + signal.Notify(hup, syscall.SIGHUP) + + quit := make(chan os.Signal, 1) + signal.Notify(quit, os.Interrupt, syscall.SIGTERM) go func() { - for range c { + select { + case <-hup: sighupWatchers.closeAll() atomic.AddInt32(&sighupCounter, 1) + case <-quit: + return } }() } diff --git a/p2p/pex/addrbook.go b/p2p/pex/addrbook.go index ef7d7eda..9596b1d7 100644 --- a/p2p/pex/addrbook.go +++ b/p2p/pex/addrbook.go @@ -496,7 +496,6 @@ out: } saveFileTicker.Stop() a.saveToFile(a.filePath) - a.Logger.Info("Address handler done") } //----------------------------------------------------------