diff --git a/raft/backend.go b/raft/backend.go index 6b90db256..ca3d6644b 100644 --- a/raft/backend.go +++ b/raft/backend.go @@ -5,10 +5,9 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/ethdb" diff --git a/raft/handler.go b/raft/handler.go index b3bf42fee..e871c603d 100755 --- a/raft/handler.go +++ b/raft/handler.go @@ -11,29 +11,27 @@ import ( "sync" "time" - "golang.org/x/net/context" - + "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/fileutil" + raftTypes "github.com/coreos/etcd/pkg/types" + etcdRaft "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/rafthttp" "github.com/coreos/etcd/snap" "github.com/coreos/etcd/wal" + mapset "github.com/deckarep/golang-set" + "github.com/syndtr/goleveldb/leveldb" + "golang.org/x/net/context" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/rlp" - - "github.com/coreos/etcd/etcdserver/stats" - raftTypes "github.com/coreos/etcd/pkg/types" - etcdRaft "github.com/coreos/etcd/raft" - "github.com/coreos/etcd/raft/raftpb" - "github.com/coreos/etcd/rafthttp" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" - "github.com/syndtr/goleveldb/leveldb" - - mapset "github.com/deckarep/golang-set" + "github.com/ethereum/go-ethereum/rlp" ) type ProtocolManager struct { @@ -446,6 +444,19 @@ func (pm *ProtocolManager) startRaft() { // the single call to `pm.applyNewChainHead` for more context. lastAppliedIndex = lastPersistedCommittedIndex } + + // fix raft applied index out of range + firstIndex, err := pm.raftStorage.FirstIndex() + if err != nil { + panic(fmt.Sprintf("failed to read last persisted applied index from raft while restarting: %v", err)) + } + lastPersistedAppliedIndex := firstIndex - 1 + if lastPersistedAppliedIndex > lastAppliedIndex { + log.Debug("set lastAppliedIndex to lastPersistedAppliedIndex", "last applied index", lastAppliedIndex, "last persisted applied index", lastPersistedAppliedIndex) + + lastAppliedIndex = lastPersistedAppliedIndex + pm.advanceAppliedIndex(lastAppliedIndex) + } } } diff --git a/raft/handler_test.go b/raft/handler_test.go new file mode 100644 index 000000000..35fd561cd --- /dev/null +++ b/raft/handler_test.go @@ -0,0 +1,155 @@ +package raft + +import ( + "crypto/ecdsa" + "fmt" + "io/ioutil" + "net" + "os" + "reflect" + "testing" + "time" + "unsafe" + + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/params" +) + +// pm.advanceAppliedIndex() and state updates are in different +// transaction boundaries hence there's a probablity that they are +// out of sync due to premature shutdown +func TestProtocolManager_whenAppliedIndexOutOfSync(t *testing.T) { + tmpWorkingDir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatal(err) + } + defer func() { + _ = os.RemoveAll(tmpWorkingDir) + }() + count := 3 + ports := make([]uint16, count) + nodeKeys := make([]*ecdsa.PrivateKey, count) + peers := make([]*enode.Node, count) + for i := 0; i < count; i++ { + ports[i] = nextPort(t) + nodeKeys[i] = mustNewNodeKey(t) + peers[i] = enode.NewV4(&(nodeKeys[i].PublicKey), net.IPv4(127, 0, 0, 1), 0, 0, int(ports[i])) + } + raftNodes := make([]*RaftService, count) + for i := 0; i < count; i++ { + if s, err := startRaftNode(uint16(i+1), ports[i], tmpWorkingDir, nodeKeys[i], peers); err != nil { + t.Fatal(err) + } else { + raftNodes[i] = s + } + } + waitFunc := func() { + for { + time.Sleep(200 * time.Millisecond) + for i := 0; i < count; i++ { + if raftNodes[i].raftProtocolManager.role == minterRole { + return + } + } + } + } + waitFunc() + // update the index to mimic the issue (set applied index behind for node 0) + raftNodes[0].raftProtocolManager.advanceAppliedIndex(1) + // now stop and restart the nodes + for i := 0; i < count; i++ { + if err := raftNodes[i].Stop(); err != nil { + t.Fatal(err) + } + } + log.Debug("restart raft cluster") + for i := 0; i < count; i++ { + if s, err := startRaftNode(uint16(i+1), ports[i], tmpWorkingDir, nodeKeys[i], peers); err != nil { + t.Fatal(err) + } else { + raftNodes[i] = s + } + } + waitFunc() +} + +func mustNewNodeKey(t *testing.T) *ecdsa.PrivateKey { + k, err := crypto.GenerateKey() + if err != nil { + t.Fatal(err) + } + return k +} + +func nextPort(t *testing.T) uint16 { + listener, err := net.Listen("tcp", ":0") + if err != nil { + t.Fatal(err) + } + return uint16(listener.Addr().(*net.TCPAddr).Port) +} + +func prepareServiceContext(key *ecdsa.PrivateKey) (ctx *node.ServiceContext, cfg *node.Config, err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("%s", r) + ctx = nil + cfg = nil + } + }() + cfg = &node.Config{ + P2P: p2p.Config{ + PrivateKey: key, + }, + } + ctx = &node.ServiceContext{ + EventMux: new(event.TypeMux), + } + // config is private field so we need some workaround to set the value + configField := reflect.ValueOf(ctx).Elem().FieldByName("config") + configField = reflect.NewAt(configField.Type(), unsafe.Pointer(configField.UnsafeAddr())).Elem() + configField.Set(reflect.ValueOf(cfg)) + return +} + +func startRaftNode(id, port uint16, tmpWorkingDir string, key *ecdsa.PrivateKey, nodes []*enode.Node) (*RaftService, error) { + datadir := fmt.Sprintf("%s/node%d", tmpWorkingDir, id) + + ctx, _, err := prepareServiceContext(key) + if err != nil { + return nil, err + } + + e, err := eth.New(ctx, ð.Config{ + Genesis: &core.Genesis{Config: params.QuorumTestChainConfig}, + }) + if err != nil { + return nil, err + } + + s, err := New(ctx, params.QuorumTestChainConfig, id, port, false, 100*time.Millisecond, e, nodes, datadir, false) + if err != nil { + return nil, err + } + + srv := &p2p.Server{ + Config: p2p.Config{ + PrivateKey: key, + }, + } + if err := srv.Start(); err != nil { + return nil, fmt.Errorf("could not start: %v", err) + } + if err := s.Start(srv); err != nil { + return nil, err + } + + return s, nil +} diff --git a/raft/snapshot.go b/raft/snapshot.go index 0c3386c48..fefb6a139 100644 --- a/raft/snapshot.go +++ b/raft/snapshot.go @@ -9,17 +9,17 @@ import ( "sort" "time" - mapset "github.com/deckarep/golang-set" - "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/p2p/enr" - "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/snap" "github.com/coreos/etcd/wal/walpb" + mapset "github.com/deckarep/golang-set" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/rlp" ) diff --git a/raft/speculative_chain.go b/raft/speculative_chain.go index 5dcb82658..3fa8806cc 100644 --- a/raft/speculative_chain.go +++ b/raft/speculative_chain.go @@ -1,12 +1,12 @@ package raft import ( + mapset "github.com/deckarep/golang-set" + "gopkg.in/oleiade/lane.v1" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" - - "github.com/deckarep/golang-set" - "gopkg.in/oleiade/lane.v1" ) // The speculative chain represents blocks that we have minted which haven't been accepted into the chain yet, building