raft: correct the setup sequence for applied index related test (#900)

This commit is contained in:
Trung Nguyen 2019-12-05 16:46:44 -05:00 committed by Samer Falah
parent d9ded9d572
commit 89067fa93e
1 changed files with 50 additions and 6 deletions

View File

@ -2,6 +2,7 @@ package raft
import (
"crypto/ecdsa"
"encoding/binary"
"fmt"
"io/ioutil"
"net"
@ -11,6 +12,8 @@ import (
"time"
"unsafe"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth"
@ -25,7 +28,9 @@ import (
// pm.advanceAppliedIndex() and state updates are in different
// transaction boundaries hence there's a probablity that they are
// out of sync due to premature shutdown
func IgnoreTestProtocolManager_whenAppliedIndexOutOfSync(t *testing.T) {
func TestProtocolManager_whenAppliedIndexOutOfSync(t *testing.T) {
logger := log.New()
logger.SetHandler(log.StreamHandler(os.Stdout, log.TerminalFormat(false)))
tmpWorkingDir, err := ioutil.TempDir("", "")
if err != nil {
t.Fatal(err)
@ -52,7 +57,7 @@ func IgnoreTestProtocolManager_whenAppliedIndexOutOfSync(t *testing.T) {
}
waitFunc := func() {
for {
time.Sleep(200 * time.Millisecond)
time.Sleep(10 * time.Millisecond)
for i := 0; i < count; i++ {
if raftNodes[i].raftProtocolManager.role == minterRole {
return
@ -61,15 +66,26 @@ func IgnoreTestProtocolManager_whenAppliedIndexOutOfSync(t *testing.T) {
}
}
waitFunc()
// update the index to mimic the issue (set applied index behind for node 0)
raftNodes[0].raftProtocolManager.advanceAppliedIndex(1)
// now stop and restart the nodes
logger.Debug("stop the cluster")
for i := 0; i < count; i++ {
if err := raftNodes[i].Stop(); err != nil {
t.Fatal(err)
}
// somehow the wal dir is still being locked that causes failures in subsequent start
// we need to check here to make sure everything is fully stopped
for isWalDirStillLocked(fmt.Sprintf("%s/node%d/raft-wal", tmpWorkingDir, i+1)) {
logger.Debug("sleep...", "i", i)
time.Sleep(10 * time.Millisecond)
}
logger.Debug("node stopped", "id", i)
}
log.Debug("restart raft cluster")
logger.Debug("update applied index")
// update the index to mimic the issue (set applied index behind for node 0)
if err := writeAppliedIndex(tmpWorkingDir, 0, 1); err != nil {
t.Fatal(err)
}
//time.Sleep(3 * time.Second)
logger.Debug("restart the cluster")
for i := 0; i < count; i++ {
if s, err := startRaftNode(uint16(i+1), ports[i], tmpWorkingDir, nodeKeys[i], peers); err != nil {
t.Fatal(err)
@ -80,6 +96,34 @@ func IgnoreTestProtocolManager_whenAppliedIndexOutOfSync(t *testing.T) {
waitFunc()
}
func isWalDirStillLocked(walDir string) bool {
var snap walpb.Snapshot
w, err := wal.Open(walDir, snap)
if err != nil {
return true
}
defer func() {
_ = w.Close()
}()
return false
}
func writeAppliedIndex(workingDir string, node int, index uint64) error {
db, err := openQuorumRaftDb(fmt.Sprintf("%s/node%d/quorum-raft-state", workingDir, node+1))
if err != nil {
return err
}
defer func() {
_ = db.Close()
}()
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, index)
if err := db.Put(appliedDbKey, buf, noFsync); err != nil {
return err
}
return nil
}
func mustNewNodeKey(t *testing.T) *ecdsa.PrivateKey {
k, err := crypto.GenerateKey()
if err != nil {