From 71baad59dfa202f0a6a93fe572908dd83dcefd06 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 11 Oct 2016 16:06:46 -0400 Subject: [PATCH] replay: ensure cs.height and wal.height match --- blockchain/reactor.go | 2 +- consensus/replay.go | 18 +++++++++++++++--- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 9a68ad88..5946e77d 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -129,7 +129,7 @@ func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) return } - log.Notice("Receive", "src", src, "chID", chID, "msg", msg) + log.Info("Receive", "src", src, "chID", chID, "msg", msg) switch msg := msg.(type) { case *bcBlockRequestMessage: diff --git a/consensus/replay.go b/consensus/replay.go index c77d0558..afdbcf9f 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -77,7 +77,7 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte // replay only those messages since the last block. // timeoutRoutine should run concurrently to read off tickChan -func (cs *ConsensusState) catchupReplay(height int) error { +func (cs *ConsensusState) catchupReplay(csHeight int) error { if !cs.wal.Exists() { return nil } @@ -88,6 +88,7 @@ func (cs *ConsensusState) catchupReplay(height int) error { // starting from end of file, // read messages until a new height is found + var walHeight int nLines, err := cs.wal.SeekFromEnd(func(lineBytes []byte) bool { var err error var msg ConsensusLogMessage @@ -96,8 +97,8 @@ func (cs *ConsensusState) catchupReplay(height int) error { panic(Fmt("Failed to read cs_msg_log json: %v", err)) } m, ok := msg.Msg.(types.EventDataRoundState) + walHeight = m.Height if ok && m.Step == RoundStepNewHeight.String() { - // TODO: ensure the height matches return true } return false @@ -107,12 +108,23 @@ func (cs *ConsensusState) catchupReplay(height int) error { return err } + // ensure the height matches + if walHeight != csHeight { + var err error + if walHeight > csHeight { + err = errors.New(Fmt("WAL height (%d) exceeds cs height (%d). Is your cs.state corrupted?", walHeight, csHeight)) + } else { + log.Notice("Replay: nothing to do", "cs.height", csHeight, "wal.height", walHeight) + } + return err + } + var beginning bool // if we had to go back to the beginning if c, _ := cs.wal.fp.Seek(0, 1); c == 0 { beginning = true } - log.Notice("Catchup by replaying consensus messages", "n", nLines) + log.Notice("Catchup by replaying consensus messages", "n", nLines, "height", walHeight) // now we can replay the latest nLines on consensus state // note we can't use scan because we've already been reading from the file