Fix raft applied index out of range (#880)

This commit is contained in:
Zhou Zhiyao 2019-12-02 21:05:32 +08:00 committed by Samer Falah
parent b7edc0b6c9
commit ef99f6d82c
5 changed files with 186 additions and 21 deletions

View File

@ -5,10 +5,9 @@ import (
"sync"
"time"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/ethdb"

View File

@ -11,29 +11,27 @@ import (
"sync"
"time"
"golang.org/x/net/context"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/fileutil"
raftTypes "github.com/coreos/etcd/pkg/types"
etcdRaft "github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/wal"
mapset "github.com/deckarep/golang-set"
"github.com/syndtr/goleveldb/leveldb"
"golang.org/x/net/context"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
"github.com/coreos/etcd/etcdserver/stats"
raftTypes "github.com/coreos/etcd/pkg/types"
etcdRaft "github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/rafthttp"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/syndtr/goleveldb/leveldb"
mapset "github.com/deckarep/golang-set"
"github.com/ethereum/go-ethereum/rlp"
)
type ProtocolManager struct {
@ -446,6 +444,19 @@ func (pm *ProtocolManager) startRaft() {
// the single call to `pm.applyNewChainHead` for more context.
lastAppliedIndex = lastPersistedCommittedIndex
}
// fix raft applied index out of range
firstIndex, err := pm.raftStorage.FirstIndex()
if err != nil {
panic(fmt.Sprintf("failed to read last persisted applied index from raft while restarting: %v", err))
}
lastPersistedAppliedIndex := firstIndex - 1
if lastPersistedAppliedIndex > lastAppliedIndex {
log.Debug("set lastAppliedIndex to lastPersistedAppliedIndex", "last applied index", lastAppliedIndex, "last persisted applied index", lastPersistedAppliedIndex)
lastAppliedIndex = lastPersistedAppliedIndex
pm.advanceAppliedIndex(lastAppliedIndex)
}
}
}

155
raft/handler_test.go Normal file
View File

@ -0,0 +1,155 @@
package raft
import (
"crypto/ecdsa"
"fmt"
"io/ioutil"
"net"
"os"
"reflect"
"testing"
"time"
"unsafe"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/params"
)
// pm.advanceAppliedIndex() and state updates are in different
// transaction boundaries hence there's a probablity that they are
// out of sync due to premature shutdown
func TestProtocolManager_whenAppliedIndexOutOfSync(t *testing.T) {
tmpWorkingDir, err := ioutil.TempDir("", "")
if err != nil {
t.Fatal(err)
}
defer func() {
_ = os.RemoveAll(tmpWorkingDir)
}()
count := 3
ports := make([]uint16, count)
nodeKeys := make([]*ecdsa.PrivateKey, count)
peers := make([]*enode.Node, count)
for i := 0; i < count; i++ {
ports[i] = nextPort(t)
nodeKeys[i] = mustNewNodeKey(t)
peers[i] = enode.NewV4(&(nodeKeys[i].PublicKey), net.IPv4(127, 0, 0, 1), 0, 0, int(ports[i]))
}
raftNodes := make([]*RaftService, count)
for i := 0; i < count; i++ {
if s, err := startRaftNode(uint16(i+1), ports[i], tmpWorkingDir, nodeKeys[i], peers); err != nil {
t.Fatal(err)
} else {
raftNodes[i] = s
}
}
waitFunc := func() {
for {
time.Sleep(200 * time.Millisecond)
for i := 0; i < count; i++ {
if raftNodes[i].raftProtocolManager.role == minterRole {
return
}
}
}
}
waitFunc()
// update the index to mimic the issue (set applied index behind for node 0)
raftNodes[0].raftProtocolManager.advanceAppliedIndex(1)
// now stop and restart the nodes
for i := 0; i < count; i++ {
if err := raftNodes[i].Stop(); err != nil {
t.Fatal(err)
}
}
log.Debug("restart raft cluster")
for i := 0; i < count; i++ {
if s, err := startRaftNode(uint16(i+1), ports[i], tmpWorkingDir, nodeKeys[i], peers); err != nil {
t.Fatal(err)
} else {
raftNodes[i] = s
}
}
waitFunc()
}
func mustNewNodeKey(t *testing.T) *ecdsa.PrivateKey {
k, err := crypto.GenerateKey()
if err != nil {
t.Fatal(err)
}
return k
}
func nextPort(t *testing.T) uint16 {
listener, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
}
return uint16(listener.Addr().(*net.TCPAddr).Port)
}
func prepareServiceContext(key *ecdsa.PrivateKey) (ctx *node.ServiceContext, cfg *node.Config, err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("%s", r)
ctx = nil
cfg = nil
}
}()
cfg = &node.Config{
P2P: p2p.Config{
PrivateKey: key,
},
}
ctx = &node.ServiceContext{
EventMux: new(event.TypeMux),
}
// config is private field so we need some workaround to set the value
configField := reflect.ValueOf(ctx).Elem().FieldByName("config")
configField = reflect.NewAt(configField.Type(), unsafe.Pointer(configField.UnsafeAddr())).Elem()
configField.Set(reflect.ValueOf(cfg))
return
}
func startRaftNode(id, port uint16, tmpWorkingDir string, key *ecdsa.PrivateKey, nodes []*enode.Node) (*RaftService, error) {
datadir := fmt.Sprintf("%s/node%d", tmpWorkingDir, id)
ctx, _, err := prepareServiceContext(key)
if err != nil {
return nil, err
}
e, err := eth.New(ctx, &eth.Config{
Genesis: &core.Genesis{Config: params.QuorumTestChainConfig},
})
if err != nil {
return nil, err
}
s, err := New(ctx, params.QuorumTestChainConfig, id, port, false, 100*time.Millisecond, e, nodes, datadir, false)
if err != nil {
return nil, err
}
srv := &p2p.Server{
Config: p2p.Config{
PrivateKey: key,
},
}
if err := srv.Start(); err != nil {
return nil, fmt.Errorf("could not start: %v", err)
}
if err := s.Start(srv); err != nil {
return nil, err
}
return s, nil
}

View File

@ -9,17 +9,17 @@ import (
"sort"
"time"
mapset "github.com/deckarep/golang-set"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/wal/walpb"
mapset "github.com/deckarep/golang-set"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/rlp"
)

View File

@ -1,12 +1,12 @@
package raft
import (
mapset "github.com/deckarep/golang-set"
"gopkg.in/oleiade/lane.v1"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/deckarep/golang-set"
"gopkg.in/oleiade/lane.v1"
)
// The speculative chain represents blocks that we have minted which haven't been accepted into the chain yet, building