consensus: wal.Flush() and cleanup replay tests

This commit is contained in:
Ethan Buchman 2016-12-17 23:43:17 -05:00
parent f33cc3fb3b
commit dcbb35089f
2 changed files with 35 additions and 38 deletions

View File

@ -14,6 +14,11 @@ import (
"github.com/tendermint/tendermint/types"
)
// TODO: these tests ensure we can always recover from any state of the wal,
// assuming a related state of the priv val
// it would be better to verify explicitly which states we can recover from without the wal
// and which ones we need the wal for
var data_dir = path.Join(GoPath, "src/github.com/tendermint/tendermint/consensus", "test_data")
// the priv validator changes step at these lines for a block with 1 val and 1 part
@ -142,6 +147,16 @@ func setupReplayTest(thisCase *testCase, nLines int, crashAfter bool) (*Consensu
return cs, newBlockCh, lastMsg, walDir
}
func readJSON(t *testing.T, walMsg string) TimedWALMessage {
var err error
var msg TimedWALMessage
wire.ReadJSON(&msg, []byte(walMsg), &err)
if err != nil {
t.Fatalf("Error reading json data: %v", err)
}
return msg
}
//-----------------------------------------------
// Test the log at every iteration, and set the privVal last step
// as if the log was written after signing, before the crash
@ -164,14 +179,9 @@ func TestReplayCrashBeforeWritePropose(t *testing.T) {
for _, thisCase := range testCases {
lineNum := thisCase.proposeLine
cs, newBlockCh, proposalMsg, walDir := setupReplayTest(thisCase, lineNum, false) // propose
// Set LastSig
var err error
var msg TimedWALMessage
wire.ReadJSON(&msg, []byte(proposalMsg), &err)
msg := readJSON(t, proposalMsg)
proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage)
if err != nil {
t.Fatalf("Error reading json data: %v", err)
}
// Set LastSig
toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, proposal.Proposal)
toPV(cs.privValidator).LastSignature = proposal.Proposal.Signature
runReplayTest(t, cs, walDir, newBlockCh, thisCase, lineNum)
@ -180,40 +190,24 @@ func TestReplayCrashBeforeWritePropose(t *testing.T) {
func TestReplayCrashBeforeWritePrevote(t *testing.T) {
for _, thisCase := range testCases {
lineNum := thisCase.prevoteLine
cs, newBlockCh, voteMsg, walDir := setupReplayTest(thisCase, lineNum, false) // prevote
types.AddListenerForEvent(cs.evsw, "tester", types.EventStringCompleteProposal(), func(data types.TMEventData) {
// Set LastSig
var err error
var msg TimedWALMessage
wire.ReadJSON(&msg, []byte(voteMsg), &err)
vote := msg.Msg.(msgInfo).Msg.(*VoteMessage)
if err != nil {
t.Fatalf("Error reading json data: %v", err)
}
toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote)
toPV(cs.privValidator).LastSignature = vote.Vote.Signature
})
runReplayTest(t, cs, walDir, newBlockCh, thisCase, lineNum)
testReplayCrashBeforeWriteVote(t, thisCase, thisCase.prevoteLine, types.EventStringCompleteProposal())
}
}
func TestReplayCrashBeforeWritePrecommit(t *testing.T) {
for _, thisCase := range testCases {
lineNum := thisCase.precommitLine
cs, newBlockCh, voteMsg, walDir := setupReplayTest(thisCase, lineNum, false) // precommit
types.AddListenerForEvent(cs.evsw, "tester", types.EventStringPolka(), func(data types.TMEventData) {
// Set LastSig
var err error
var msg TimedWALMessage
wire.ReadJSON(&msg, []byte(voteMsg), &err)
vote := msg.Msg.(msgInfo).Msg.(*VoteMessage)
if err != nil {
t.Fatalf("Error reading json data: %v", err)
}
toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote)
toPV(cs.privValidator).LastSignature = vote.Vote.Signature
})
runReplayTest(t, cs, walDir, newBlockCh, thisCase, lineNum)
testReplayCrashBeforeWriteVote(t, thisCase, thisCase.precommitLine, types.EventStringPolka())
}
}
func testReplayCrashBeforeWriteVote(t *testing.T, thisCase *testCase, lineNum int, eventString string) {
cs, newBlockCh, voteMsg, walDir := setupReplayTest(thisCase, lineNum, false) // prevote
types.AddListenerForEvent(cs.evsw, "tester", eventString, func(data types.TMEventData) {
msg := readJSON(t, voteMsg)
vote := msg.Msg.(msgInfo).Msg.(*VoteMessage)
// Set LastSig
toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote)
toPV(cs.privValidator).LastSignature = vote.Vote.Signature
})
runReplayTest(t, cs, walDir, newBlockCh, thisCase, lineNum)
}

View File

@ -79,7 +79,6 @@ func (wal *WAL) Save(wmsg WALMessage) {
if wal.light {
// in light mode we only write new steps, timeouts, and our own votes (no proposals, block parts)
if mi, ok := wmsg.(msgInfo); ok {
_ = mi
if mi.PeerKey != "" {
return
}
@ -97,6 +96,10 @@ func (wal *WAL) Save(wmsg WALMessage) {
if err != nil {
PanicQ(Fmt("Error writing msg to consensus wal. Error: %v \n\nMessage: %v", err, wmsg))
}
// TODO: only flush when necessary
if err := wal.group.Flush(); err != nil {
PanicQ(Fmt("Error flushing consensus wal buf to file. Error: %v \n", err))
}
}
func (wal *WAL) writeHeight(height int) {