diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 8cde817a..149ffb2f 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -14,6 +14,11 @@ import ( "github.com/tendermint/tendermint/types" ) +// TODO: these tests ensure we can always recover from any state of the wal, +// assuming a related state of the priv val +// it would be better to verify explicitly which states we can recover from without the wal +// and which ones we need the wal for + var data_dir = path.Join(GoPath, "src/github.com/tendermint/tendermint/consensus", "test_data") // the priv validator changes step at these lines for a block with 1 val and 1 part @@ -142,6 +147,16 @@ func setupReplayTest(thisCase *testCase, nLines int, crashAfter bool) (*Consensu return cs, newBlockCh, lastMsg, walDir } +func readJSON(t *testing.T, walMsg string) TimedWALMessage { + var err error + var msg TimedWALMessage + wire.ReadJSON(&msg, []byte(walMsg), &err) + if err != nil { + t.Fatalf("Error reading json data: %v", err) + } + return msg +} + //----------------------------------------------- // Test the log at every iteration, and set the privVal last step // as if the log was written after signing, before the crash @@ -164,14 +179,9 @@ func TestReplayCrashBeforeWritePropose(t *testing.T) { for _, thisCase := range testCases { lineNum := thisCase.proposeLine cs, newBlockCh, proposalMsg, walDir := setupReplayTest(thisCase, lineNum, false) // propose - // Set LastSig - var err error - var msg TimedWALMessage - wire.ReadJSON(&msg, []byte(proposalMsg), &err) + msg := readJSON(t, proposalMsg) proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage) - if err != nil { - t.Fatalf("Error reading json data: %v", err) - } + // Set LastSig toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, proposal.Proposal) toPV(cs.privValidator).LastSignature = proposal.Proposal.Signature runReplayTest(t, cs, walDir, newBlockCh, thisCase, lineNum) @@ -180,40 +190,24 @@ func TestReplayCrashBeforeWritePropose(t *testing.T) { func TestReplayCrashBeforeWritePrevote(t *testing.T) { for _, thisCase := range testCases { - lineNum := thisCase.prevoteLine - cs, newBlockCh, voteMsg, walDir := setupReplayTest(thisCase, lineNum, false) // prevote - types.AddListenerForEvent(cs.evsw, "tester", types.EventStringCompleteProposal(), func(data types.TMEventData) { - // Set LastSig - var err error - var msg TimedWALMessage - wire.ReadJSON(&msg, []byte(voteMsg), &err) - vote := msg.Msg.(msgInfo).Msg.(*VoteMessage) - if err != nil { - t.Fatalf("Error reading json data: %v", err) - } - toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote) - toPV(cs.privValidator).LastSignature = vote.Vote.Signature - }) - runReplayTest(t, cs, walDir, newBlockCh, thisCase, lineNum) + testReplayCrashBeforeWriteVote(t, thisCase, thisCase.prevoteLine, types.EventStringCompleteProposal()) } } func TestReplayCrashBeforeWritePrecommit(t *testing.T) { for _, thisCase := range testCases { - lineNum := thisCase.precommitLine - cs, newBlockCh, voteMsg, walDir := setupReplayTest(thisCase, lineNum, false) // precommit - types.AddListenerForEvent(cs.evsw, "tester", types.EventStringPolka(), func(data types.TMEventData) { - // Set LastSig - var err error - var msg TimedWALMessage - wire.ReadJSON(&msg, []byte(voteMsg), &err) - vote := msg.Msg.(msgInfo).Msg.(*VoteMessage) - if err != nil { - t.Fatalf("Error reading json data: %v", err) - } - toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote) - toPV(cs.privValidator).LastSignature = vote.Vote.Signature - }) - runReplayTest(t, cs, walDir, newBlockCh, thisCase, lineNum) + testReplayCrashBeforeWriteVote(t, thisCase, thisCase.precommitLine, types.EventStringPolka()) } } + +func testReplayCrashBeforeWriteVote(t *testing.T, thisCase *testCase, lineNum int, eventString string) { + cs, newBlockCh, voteMsg, walDir := setupReplayTest(thisCase, lineNum, false) // prevote + types.AddListenerForEvent(cs.evsw, "tester", eventString, func(data types.TMEventData) { + msg := readJSON(t, voteMsg) + vote := msg.Msg.(msgInfo).Msg.(*VoteMessage) + // Set LastSig + toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote) + toPV(cs.privValidator).LastSignature = vote.Vote.Signature + }) + runReplayTest(t, cs, walDir, newBlockCh, thisCase, lineNum) +} diff --git a/consensus/wal.go b/consensus/wal.go index ea16e776..2c03027c 100644 --- a/consensus/wal.go +++ b/consensus/wal.go @@ -79,7 +79,6 @@ func (wal *WAL) Save(wmsg WALMessage) { if wal.light { // in light mode we only write new steps, timeouts, and our own votes (no proposals, block parts) if mi, ok := wmsg.(msgInfo); ok { - _ = mi if mi.PeerKey != "" { return } @@ -97,6 +96,10 @@ func (wal *WAL) Save(wmsg WALMessage) { if err != nil { PanicQ(Fmt("Error writing msg to consensus wal. Error: %v \n\nMessage: %v", err, wmsg)) } + // TODO: only flush when necessary + if err := wal.group.Flush(); err != nil { + PanicQ(Fmt("Error flushing consensus wal buf to file. Error: %v \n", err)) + } } func (wal *WAL) writeHeight(height int) {