Merge pull request #2135 from tendermint/2072-unresponsive-tm-after-cs-failure

consensus: non-responsive to CTRL-C if consensus state panics
This commit is contained in:
Ethan Buchman 2018-08-03 23:39:25 -04:00 committed by GitHub
commit d5b5e5a2e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 70 additions and 54 deletions

View File

@ -26,3 +26,4 @@ BUG FIXES:
- [common] Safely handle cases where atomic write files already exist [#2109](https://github.com/tendermint/tendermint/issues/2109) - [common] Safely handle cases where atomic write files already exist [#2109](https://github.com/tendermint/tendermint/issues/2109)
- [privval] fix a deadline for accepting new connections in socket private - [privval] fix a deadline for accepting new connections in socket private
validator. validator.
- [node] Fully exit when CTRL-C is pressed even if consensus state panics [#2072]

View File

@ -553,9 +553,30 @@ func (cs *ConsensusState) newStep() {
// Updates (state transitions) happen on timeouts, complete proposals, and 2/3 majorities. // Updates (state transitions) happen on timeouts, complete proposals, and 2/3 majorities.
// ConsensusState must be locked before any internal state is updated. // ConsensusState must be locked before any internal state is updated.
func (cs *ConsensusState) receiveRoutine(maxSteps int) { func (cs *ConsensusState) receiveRoutine(maxSteps int) {
onExit := func(cs *ConsensusState) {
// NOTE: the internalMsgQueue may have signed messages from our
// priv_val that haven't hit the WAL, but its ok because
// priv_val tracks LastSig
// close wal now that we're done writing to it
cs.wal.Stop()
cs.wal.Wait()
close(cs.done)
}
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
cs.Logger.Error("CONSENSUS FAILURE!!!", "err", r, "stack", string(debug.Stack())) cs.Logger.Error("CONSENSUS FAILURE!!!", "err", r, "stack", string(debug.Stack()))
// stop gracefully
//
// NOTE: We most probably shouldn't be running any further when there is
// some unexpected panic. Some unknown error happened, and so we don't
// know if that will result in the validator signing an invalid thing. It
// might be worthwhile to explore a mechanism for manual resuming via
// some console or secure RPC system, but for now, halting the chain upon
// unexpected consensus bugs sounds like the better option.
onExit(cs)
} }
}() }()
@ -588,16 +609,7 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) {
// go to the next step // go to the next step
cs.handleTimeout(ti, rs) cs.handleTimeout(ti, rs)
case <-cs.Quit(): case <-cs.Quit():
onExit(cs)
// NOTE: the internalMsgQueue may have signed messages from our
// priv_val that haven't hit the WAL, but its ok because
// priv_val tracks LastSig
// close wal now that we're done writing to it
cs.wal.Stop()
cs.wal.Wait()
close(cs.done)
return return
} }
} }

View File

@ -35,18 +35,20 @@ const autoFileOpenDuration = 1000 * time.Millisecond
// Automatically closes and re-opens file for writing. // Automatically closes and re-opens file for writing.
// This is useful for using a log file with the logrotate tool. // This is useful for using a log file with the logrotate tool.
type AutoFile struct { type AutoFile struct {
ID string ID string
Path string Path string
ticker *time.Ticker ticker *time.Ticker
mtx sync.Mutex tickerStopped chan struct{} // closed when ticker is stopped
file *os.File mtx sync.Mutex
file *os.File
} }
func OpenAutoFile(path string) (af *AutoFile, err error) { func OpenAutoFile(path string) (af *AutoFile, err error) {
af = &AutoFile{ af = &AutoFile{
ID: cmn.RandStr(12) + ":" + path, ID: cmn.RandStr(12) + ":" + path,
Path: path, Path: path,
ticker: time.NewTicker(autoFileOpenDuration), ticker: time.NewTicker(autoFileOpenDuration),
tickerStopped: make(chan struct{}),
} }
if err = af.openFile(); err != nil { if err = af.openFile(); err != nil {
return return
@ -58,18 +60,18 @@ func OpenAutoFile(path string) (af *AutoFile, err error) {
func (af *AutoFile) Close() error { func (af *AutoFile) Close() error {
af.ticker.Stop() af.ticker.Stop()
close(af.tickerStopped)
err := af.closeFile() err := af.closeFile()
sighupWatchers.removeAutoFile(af) sighupWatchers.removeAutoFile(af)
return err return err
} }
func (af *AutoFile) processTicks() { func (af *AutoFile) processTicks() {
for { select {
_, ok := <-af.ticker.C case <-af.ticker.C:
if !ok {
return // Done.
}
af.closeFile() af.closeFile()
case <-af.tickerStopped:
return
} }
} }

View File

@ -85,7 +85,6 @@ func OpenGroup(headPath string) (g *Group, err error) {
Head: head, Head: head,
headBuf: bufio.NewWriterSize(head, 4096*10), headBuf: bufio.NewWriterSize(head, 4096*10),
Dir: dir, Dir: dir,
ticker: time.NewTicker(groupCheckDuration),
headSizeLimit: defaultHeadSizeLimit, headSizeLimit: defaultHeadSizeLimit,
totalSizeLimit: defaultTotalSizeLimit, totalSizeLimit: defaultTotalSizeLimit,
minIndex: 0, minIndex: 0,
@ -102,6 +101,7 @@ func OpenGroup(headPath string) (g *Group, err error) {
// OnStart implements Service by starting the goroutine that checks file and // OnStart implements Service by starting the goroutine that checks file and
// group limits. // group limits.
func (g *Group) OnStart() error { func (g *Group) OnStart() error {
g.ticker = time.NewTicker(groupCheckDuration)
go g.processTicks() go g.processTicks()
return nil return nil
} }
@ -199,21 +199,15 @@ func (g *Group) Flush() error {
} }
func (g *Group) processTicks() { func (g *Group) processTicks() {
for { select {
_, ok := <-g.ticker.C case <-g.ticker.C:
if !ok {
return // Done.
}
g.checkHeadSizeLimit() g.checkHeadSizeLimit()
g.checkTotalSizeLimit() g.checkTotalSizeLimit()
case <-g.Quit():
return
} }
} }
// NOTE: for testing
func (g *Group) stopTicker() {
g.ticker.Stop()
}
// NOTE: this function is called manually in tests. // NOTE: this function is called manually in tests.
func (g *Group) checkHeadSizeLimit() { func (g *Group) checkHeadSizeLimit() {
limit := g.HeadSizeLimit() limit := g.HeadSizeLimit()

View File

@ -16,23 +16,25 @@ import (
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
) )
// NOTE: Returned group has ticker stopped func createTestGroupWithHeadSizeLimit(t *testing.T, headSizeLimit int64) *Group {
func createTestGroup(t *testing.T, headSizeLimit int64) *Group {
testID := cmn.RandStr(12) testID := cmn.RandStr(12)
testDir := "_test_" + testID testDir := "_test_" + testID
err := cmn.EnsureDir(testDir, 0700) err := cmn.EnsureDir(testDir, 0700)
require.NoError(t, err, "Error creating dir") require.NoError(t, err, "Error creating dir")
headPath := testDir + "/myfile" headPath := testDir + "/myfile"
g, err := OpenGroup(headPath) g, err := OpenGroup(headPath)
require.NoError(t, err, "Error opening Group") require.NoError(t, err, "Error opening Group")
g.SetHeadSizeLimit(headSizeLimit)
g.stopTicker()
require.NotEqual(t, nil, g, "Failed to create Group") require.NotEqual(t, nil, g, "Failed to create Group")
g.SetHeadSizeLimit(headSizeLimit)
return g return g
} }
func destroyTestGroup(t *testing.T, g *Group) { func destroyTestGroup(t *testing.T, g *Group) {
g.Close() g.Close()
err := os.RemoveAll(g.Dir) err := os.RemoveAll(g.Dir)
require.NoError(t, err, "Error removing test Group directory") require.NoError(t, err, "Error removing test Group directory")
} }
@ -45,7 +47,7 @@ func assertGroupInfo(t *testing.T, gInfo GroupInfo, minIndex, maxIndex int, tota
} }
func TestCheckHeadSizeLimit(t *testing.T) { func TestCheckHeadSizeLimit(t *testing.T) {
g := createTestGroup(t, 1000*1000) g := createTestGroupWithHeadSizeLimit(t, 1000*1000)
// At first, there are no files. // At first, there are no files.
assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 0, 0) assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 0, 0)
@ -107,7 +109,7 @@ func TestCheckHeadSizeLimit(t *testing.T) {
} }
func TestSearch(t *testing.T) { func TestSearch(t *testing.T) {
g := createTestGroup(t, 10*1000) g := createTestGroupWithHeadSizeLimit(t, 10*1000)
// Create some files in the group that have several INFO lines in them. // Create some files in the group that have several INFO lines in them.
// Try to put the INFO lines in various spots. // Try to put the INFO lines in various spots.
@ -208,7 +210,7 @@ func TestSearch(t *testing.T) {
} }
func TestRotateFile(t *testing.T) { func TestRotateFile(t *testing.T) {
g := createTestGroup(t, 0) g := createTestGroupWithHeadSizeLimit(t, 0)
g.WriteLine("Line 1") g.WriteLine("Line 1")
g.WriteLine("Line 2") g.WriteLine("Line 2")
g.WriteLine("Line 3") g.WriteLine("Line 3")
@ -238,7 +240,7 @@ func TestRotateFile(t *testing.T) {
} }
func TestFindLast1(t *testing.T) { func TestFindLast1(t *testing.T) {
g := createTestGroup(t, 0) g := createTestGroupWithHeadSizeLimit(t, 0)
g.WriteLine("Line 1") g.WriteLine("Line 1")
g.WriteLine("Line 2") g.WriteLine("Line 2")
@ -262,7 +264,7 @@ func TestFindLast1(t *testing.T) {
} }
func TestFindLast2(t *testing.T) { func TestFindLast2(t *testing.T) {
g := createTestGroup(t, 0) g := createTestGroupWithHeadSizeLimit(t, 0)
g.WriteLine("Line 1") g.WriteLine("Line 1")
g.WriteLine("Line 2") g.WriteLine("Line 2")
@ -286,7 +288,7 @@ func TestFindLast2(t *testing.T) {
} }
func TestFindLast3(t *testing.T) { func TestFindLast3(t *testing.T) {
g := createTestGroup(t, 0) g := createTestGroupWithHeadSizeLimit(t, 0)
g.WriteLine("Line 1") g.WriteLine("Line 1")
g.WriteLine("# a") g.WriteLine("# a")
@ -310,7 +312,7 @@ func TestFindLast3(t *testing.T) {
} }
func TestFindLast4(t *testing.T) { func TestFindLast4(t *testing.T) {
g := createTestGroup(t, 0) g := createTestGroupWithHeadSizeLimit(t, 0)
g.WriteLine("Line 1") g.WriteLine("Line 1")
g.WriteLine("Line 2") g.WriteLine("Line 2")
@ -332,7 +334,7 @@ func TestFindLast4(t *testing.T) {
} }
func TestWrite(t *testing.T) { func TestWrite(t *testing.T) {
g := createTestGroup(t, 0) g := createTestGroupWithHeadSizeLimit(t, 0)
written := []byte("Medusa") written := []byte("Medusa")
g.Write(written) g.Write(written)
@ -353,7 +355,7 @@ func TestWrite(t *testing.T) {
// test that Read reads the required amount of bytes from all the files in the // test that Read reads the required amount of bytes from all the files in the
// group and returns no error if n == size of the given slice. // group and returns no error if n == size of the given slice.
func TestGroupReaderRead(t *testing.T) { func TestGroupReaderRead(t *testing.T) {
g := createTestGroup(t, 0) g := createTestGroupWithHeadSizeLimit(t, 0)
professor := []byte("Professor Monster") professor := []byte("Professor Monster")
g.Write(professor) g.Write(professor)
@ -382,7 +384,7 @@ func TestGroupReaderRead(t *testing.T) {
// test that Read returns an error if number of bytes read < size of // test that Read returns an error if number of bytes read < size of
// the given slice. Subsequent call should return 0, io.EOF. // the given slice. Subsequent call should return 0, io.EOF.
func TestGroupReaderRead2(t *testing.T) { func TestGroupReaderRead2(t *testing.T) {
g := createTestGroup(t, 0) g := createTestGroupWithHeadSizeLimit(t, 0)
professor := []byte("Professor Monster") professor := []byte("Professor Monster")
g.Write(professor) g.Write(professor)
@ -413,7 +415,7 @@ func TestGroupReaderRead2(t *testing.T) {
} }
func TestMinIndex(t *testing.T) { func TestMinIndex(t *testing.T) {
g := createTestGroup(t, 0) g := createTestGroupWithHeadSizeLimit(t, 0)
assert.Zero(t, g.MinIndex(), "MinIndex should be zero at the beginning") assert.Zero(t, g.MinIndex(), "MinIndex should be zero at the beginning")
@ -422,7 +424,7 @@ func TestMinIndex(t *testing.T) {
} }
func TestMaxIndex(t *testing.T) { func TestMaxIndex(t *testing.T) {
g := createTestGroup(t, 0) g := createTestGroupWithHeadSizeLimit(t, 0)
assert.Zero(t, g.MaxIndex(), "MaxIndex should be zero at the beginning") assert.Zero(t, g.MaxIndex(), "MaxIndex should be zero at the beginning")

View File

@ -18,13 +18,19 @@ var sighupCounter int32 // For testing
func initSighupWatcher() { func initSighupWatcher() {
sighupWatchers = newSighupWatcher() sighupWatchers = newSighupWatcher()
c := make(chan os.Signal, 1) hup := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGHUP) signal.Notify(hup, syscall.SIGHUP)
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
go func() { go func() {
for range c { select {
case <-hup:
sighupWatchers.closeAll() sighupWatchers.closeAll()
atomic.AddInt32(&sighupCounter, 1) atomic.AddInt32(&sighupCounter, 1)
case <-quit:
return
} }
}() }()
} }

View File

@ -496,7 +496,6 @@ out:
} }
saveFileTicker.Stop() saveFileTicker.Stop()
a.saveToFile(a.filePath) a.saveToFile(a.filePath)
a.Logger.Info("Address handler done")
} }
//---------------------------------------------------------- //----------------------------------------------------------