Dynamic raft membership support (#130)

This commit is contained in:
Brian Schroeder 2017-08-28 17:43:44 -04:00 committed by Joel Burget
parent 18fc39b4ed
commit 5dfdecef4c
18 changed files with 1213 additions and 347 deletions

View File

@ -158,7 +158,9 @@ participating.
utils.SingleBlockMakerFlag,
utils.EnableNodePermissionFlag,
utils.RaftModeFlag,
utils.RaftBlockTime,
utils.RaftBlockTimeFlag,
utils.RaftJoinExistingFlag,
utils.RaftPortFlag,
}
app.Flags = append(app.Flags, debug.Flags...)

View File

@ -93,6 +93,15 @@ var AppHelpFlagGroups = []flagGroup{
utils.VoteMaxBlockTimeFlag,
},
},
{
Name: "RAFT",
Flags: []cli.Flag{
utils.RaftModeFlag,
utils.RaftBlockTimeFlag,
utils.RaftJoinExistingFlag,
utils.RaftPortFlag,
},
},
{
Name: "ACCOUNT",
Flags: []cli.Flag{

View File

@ -50,7 +50,6 @@ import (
"github.com/ethereum/go-ethereum/rpc"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv2"
"gopkg.in/urfave/cli.v1"
"log"
)
func init() {
@ -380,11 +379,21 @@ var (
Name: "raft",
Usage: "If enabled, uses Raft instead of Quorum Chain for consensus",
}
RaftBlockTime = cli.IntFlag{
RaftBlockTimeFlag = cli.IntFlag{
Name: "raftblocktime",
Usage: "Amount of time between raft block creations in milliseconds",
Value: 50,
}
RaftJoinExistingFlag = cli.IntFlag{
Name: "raftjoinexisting",
Usage: "The raft ID to assume when joining an pre-existing cluster",
Value: 0,
}
RaftPortFlag = cli.IntFlag{
Name: "raftport",
Usage: "The port to bind for the raft transport",
Value: 50400,
}
)
// MakeDataDir retrieves the currently requested data directory, terminating
@ -724,8 +733,10 @@ func RegisterEthService(ctx *cli.Context, stack *node.Node, extra []byte) {
}
if ctx.GlobalBool(RaftModeFlag.Name) {
blockTimeMillis := ctx.GlobalInt(RaftBlockTime.Name)
blockTimeMillis := ctx.GlobalInt(RaftBlockTimeFlag.Name)
datadir := ctx.GlobalString(DataDirFlag.Name)
joinExistingId := ctx.GlobalInt(RaftJoinExistingFlag.Name)
raftPort := uint16(ctx.GlobalInt(RaftPortFlag.Name))
logger.DoLogRaft = true
@ -734,21 +745,35 @@ func RegisterEthService(ctx *cli.Context, stack *node.Node, extra []byte) {
blockTimeNanos := time.Duration(blockTimeMillis) * time.Millisecond
peers := stack.StaticNodes()
peerIds := make([]string, len(peers))
var myId int
for peerIdx, peer := range peers {
peerId := peer.ID.String()
peerIds[peerIdx] = peerId
if peerId == strId {
myId = peerIdx + 1
var myId uint16
var joinExisting bool
if joinExistingId > 0 {
myId = uint16(joinExistingId)
joinExisting = true
} else if len(peers) == 0 {
Fatalf("Raft-based consensus requires either (1) an initial peers list (in static-nodes.json) including this enode hash (%v), or (2) the flag --raftjoinexisting RAFT_ID, where RAFT_ID has been issued by an existing cluster member calling `raft.addPeer(ENODE_ID)` with an enode ID containing this node's enode hash.", strId)
} else {
peerIds := make([]string, len(peers))
for peerIdx, peer := range peers {
if !peer.HasRaftPort() {
Fatalf("raftport querystring parameter not specified in static-node enode ID: %v. please check your static-nodes.json file.", peer.String())
}
peerId := peer.ID.String()
peerIds[peerIdx] = peerId
if peerId == strId {
myId = uint16(peerIdx) + 1
}
}
if myId == 0 {
Fatalf("failed to find local enode ID (%v) amongst peer IDs: %v", strId, peerIds)
}
}
if myId == 0 {
log.Panicf("failed to find local enode ID (%v) amongst peer IDs: %v", strId, peerIds)
}
return raft.New(ctx, chainConfig, myId, blockTimeNanos, ethereum, peers, datadir)
return raft.New(ctx, chainConfig, myId, raftPort, joinExisting, blockTimeNanos, ethereum, peers, datadir)
}); err != nil {
Fatalf("Failed to register the Raft service: %v", err)
}

View File

@ -369,6 +369,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
if p == nil {
return errUnknownPeer
}
if d.mode == BoundedFullSync {
err := d.syncWithPeerUntil(p, hash, td)
if err == nil {
d.processContent()
}
return err
}
return d.syncWithPeer(p, hash, td)
}
@ -1292,7 +1299,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
}
}
// Unless we're doing light chains, schedule the headers for associated content retrieval
if d.mode == FullSync || d.mode == FastSync {
if d.mode == FullSync || d.mode == FastSync || d.mode == BoundedFullSync {
// If we've reached the allowed number of pending headers, stall a bit
for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
select {
@ -1354,7 +1361,7 @@ func (d *Downloader) processContent() error {
items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
for _, result := range results[:items] {
switch {
case d.mode == FullSync:
case d.mode == FullSync || d.mode == BoundedFullSync:
blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
case d.mode == FastSync:
blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
@ -1503,3 +1510,211 @@ func (d *Downloader) requestTTL() time.Duration {
}
return ttl
}
// Extra downloader functionality for non-proof-of-work consensus
// Synchronizes with a peer, but only up to the provided Hash
func (d *Downloader) syncWithPeerUntil(p *peer, hash common.Hash, td *big.Int) (err error) {
d.mux.Post(StartEvent{})
defer func() {
// reset on error
if err != nil {
d.mux.Post(FailedEvent{err})
} else {
d.mux.Post(DoneEvent{})
}
}()
if p.version < 62 {
return errTooOld
}
glog.V(logger.Debug).Infof("Synchronising with the network using: %s [eth/%d]", p.id, p.version)
defer func(start time.Time) {
glog.V(logger.Debug).Infof("Synchronisation terminated after %v", time.Since(start))
}(time.Now())
remoteHeader, err := d.fetchHeader(p, hash)
if err != nil {
return err
}
remoteHeight := remoteHeader.Number.Uint64()
localHeight := d.headBlock().NumberU64()
d.syncStatsLock.Lock()
if d.syncStatsChainHeight <= localHeight || d.syncStatsChainOrigin > localHeight {
d.syncStatsChainOrigin = localHeight
}
d.syncStatsChainHeight = remoteHeight
d.syncStatsLock.Unlock()
d.queue.Prepare(localHeight+1, d.mode, uint64(0), remoteHeader)
if d.syncInitHook != nil {
d.syncInitHook(localHeight, remoteHeight)
}
return d.spawnSync(localHeight+1,
func() error { return d.fetchBoundedHeaders(p, localHeight+1, remoteHeight) },
func() error { return d.processHeaders(localHeight+1, td) },
func() error { return d.fetchBodies(localHeight + 1) },
func() error { return d.fetchReceipts(localHeight + 1) }, // Receipts are only retrieved during fast sync
func() error { return d.fetchNodeData() }, // Node state data is only retrieved during fast sync
)
}
// Fetches a single header from a peer
func (d *Downloader) fetchHeader(p *peer, hash common.Hash) (*types.Header, error) {
glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p)
go p.getRelHeaders(hash, 1, 0, false)
timeout := time.After(d.requestTTL())
for {
select {
case <-d.cancelCh:
return nil, errCancelBlockFetch
case packet := <-d.headerCh:
// Discard anything not from the origin peer
if packet.PeerId() != p.id {
glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packet.PeerId())
break
}
// Make sure the peer actually gave something valid
headers := packet.(*headerPack).headers
if len(headers) != 1 {
glog.V(logger.Debug).Infof("%v: invalid number of head headers: %d != 1", p, len(headers))
return nil, errBadPeer
}
return headers[0], nil
case <-timeout:
glog.V(logger.Debug).Infof("%v: head header timeout", p)
return nil, errTimeout
case <-d.bodyCh:
case <-d.stateCh:
case <-d.receiptCh:
// Out of bounds delivery, ignore
}
}
}
// Not defined in go's stdlib:
func minInt(a, b int) int {
if a < b {
return a
}
return b
}
// Fetches headers between `from` and `to`, inclusive.
// Assumes invariant: from <= to.
func (d *Downloader) fetchBoundedHeaders(p *peer, from uint64, to uint64) error {
glog.V(logger.Debug).Infof("%v: directing header downloads from #%d to #%d", p, from, to)
defer glog.V(logger.Debug).Infof("%v: header download terminated", p)
// Create a timeout timer, and the associated header fetcher
skeleton := true // Skeleton assembly phase or finishing up
request := time.Now() // time of the last skeleton fetch request
timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
<-timeout.C // timeout channel should be initially empty
defer timeout.Stop()
getHeaders := func(from uint64) {
request = time.Now()
timeout.Reset(d.requestTTL())
skeletonStart := from + uint64(MaxHeaderFetch) - 1
if skeleton {
if skeletonStart > to {
skeleton = false
}
}
if skeleton {
numSkeletonHeaders := minInt(MaxSkeletonSize, (int(to-from)+1)/MaxHeaderFetch)
glog.V(logger.Detail).Infof("%v: fetching %d skeleton headers from #%d", p, numSkeletonHeaders, from)
go p.getAbsHeaders(skeletonStart, numSkeletonHeaders, MaxHeaderFetch-1, false)
} else {
// There are not enough headers remaining to warrant a skeleton fetch.
// Grab all of the remaining headers.
numHeaders := int(to-from) + 1
glog.V(logger.Detail).Infof("%v: fetching %d full headers from #%d", p, numHeaders, from)
go p.getAbsHeaders(from, numHeaders, 0, false)
}
}
// Start pulling the header chain skeleton until all is done
getHeaders(from)
for {
select {
case <-d.cancelCh:
return errCancelHeaderFetch
case packet := <-d.headerCh:
// Make sure the active peer is giving us the skeleton headers
if packet.PeerId() != p.id {
glog.V(logger.Debug).Infof("Received headers from incorrect peer (%s)", packet.PeerId())
break
}
headerReqTimer.UpdateSince(request)
timeout.Stop()
headers := packet.(*headerPack).headers
// If we received a skeleton batch, resolve internals concurrently
if skeleton {
filled, proced, err := d.fillHeaderSkeleton(from, headers)
if err != nil {
glog.V(logger.Debug).Infof("%v: skeleton chain invalid: %v", p, err)
return errInvalidChain
}
headers = filled[proced:]
from += uint64(proced)
}
// Insert all the new headers and fetch the next batch
if len(headers) > 0 {
glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from)
select {
case d.headerProcCh <- headers:
case <-d.cancelCh:
return errCancelHeaderFetch
}
from += uint64(len(headers))
}
if from <= to {
getHeaders(from)
} else {
// Notify the content fetchers that no more headers are inbound and return.
select {
case d.headerProcCh <- nil:
return nil
case <-d.cancelCh:
return errCancelHeaderFetch
}
}
case <-timeout.C:
// Header retrieval timed out, consider the peer bad and drop
glog.V(logger.Debug).Infof("%v: header request timed out", p)
headerTimeoutMeter.Mark(1)
d.dropPeer(p.id)
// Finish the sync gracefully instead of dumping the gathered data though
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
}
}
select {
case d.headerProcCh <- nil:
case <-d.cancelCh:
}
return errBadPeer
}
}
}

View File

@ -23,4 +23,6 @@ const (
FullSync SyncMode = iota // Synchronise the entire blockchain history from full blocks
FastSync // Quickly download the headers, full sync only at the chain head
LightSync // Download only the headers and terminate afterwards
// Used by raft:
BoundedFullSync SyncMode = 100 // Perform a full sync until the requested hash, and no further
)

View File

@ -326,8 +326,12 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
defer msg.Discard()
if pm.raftMode {
if msg.Code != TxMsg {
glog.V(logger.Debug).Infof("raft: ignoring non-TxMsg with code %v", msg.Code)
if msg.Code != TxMsg &&
msg.Code != GetBlockHeadersMsg && msg.Code != BlockHeadersMsg &&
msg.Code != GetBlockBodiesMsg && msg.Code != BlockBodiesMsg {
glog.V(logger.Warn).Infof("raft: ignoring message with code %v", msg.Code)
return nil
}
}
@ -744,7 +748,11 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) {
// Broadcast transaction to a batch of peers not knowing about it
peers := pm.peers.PeersWithoutTx(hash)
//FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
// NOTE: Raft-based consensus currently assumes that geth broadcasts
// transactions to all peers in the network. A previous comment here
// indicated that this logic might change in the future to only send to a
// subset of peers. If this change occurs upstream, a merge conflict should
// arise here, and we should add logic to send to *all* peers in raft mode.
for _, peer := range peers {
peer.SendTransactions(types.Transactions{tx})
}

View File

@ -146,11 +146,15 @@ func (pm *ProtocolManager) syncer() {
if pm.peers.Len() < minDesiredPeerCount {
break
}
go pm.synchronise(pm.peers.BestPeer())
if !pm.raftMode {
go pm.synchronise(pm.peers.BestPeer())
}
case <-forceSync:
// Force a sync even if not enough peers are present
go pm.synchronise(pm.peers.BestPeer())
if !pm.raftMode {
// Force a sync even if not enough peers are present
go pm.synchronise(pm.peers.BestPeer())
}
case <-pm.noMorePeers:
return

View File

@ -524,6 +524,16 @@ web3._extend({
new web3._extend.Property({
name: 'role',
getter: 'raft_role'
}),
new web3._extend.Method({
name: 'addPeer',
call: 'raft_addPeer',
params: 1
}),
new web3._extend.Method({
name: 'removePeer',
call: 'raft_removePeer',
params: 1
})
]
})

View File

@ -44,6 +44,8 @@ type Node struct {
UDP, TCP uint16 // port numbers
ID NodeID // the node's public key
RaftPort uint16
// This is a cached copy of sha3(ID) which is used for node
// distance calculations. This is part of Node in order to make it
// possible to write tests that need a node at a certain distance.
@ -111,10 +113,23 @@ func (n *Node) String() string {
if n.UDP != n.TCP {
u.RawQuery = "discport=" + strconv.Itoa(int(n.UDP))
}
if n.HasRaftPort() {
raftQuery := "raftport=" + strconv.Itoa(int(n.RaftPort))
if len(u.RawQuery) > 0 {
u.RawQuery = u.RawQuery + "&" + raftQuery
} else {
u.RawQuery = raftQuery
}
}
}
return u.String()
}
func (n *Node) HasRaftPort() bool {
return n.RaftPort > 0
}
var incompleteNodeURL = regexp.MustCompile("(?i)^(?:enode://)?([0-9a-f]+)$")
// ParseNode parses a node designator.
@ -195,7 +210,17 @@ func parseComplete(rawurl string) (*Node, error) {
return nil, errors.New("invalid discport in query")
}
}
return NewNode(id, ip, uint16(udpPort), uint16(tcpPort)), nil
node := NewNode(id, ip, uint16(udpPort), uint16(tcpPort))
if qv.Get("raftport") != "" {
raftPort, err := strconv.ParseUint(qv.Get("raftport"), 10, 16)
if err != nil {
return nil, errors.New("invalid raftport in query")
}
node.RaftPort = uint16(raftPort)
}
return node, nil
}
// MustParseNode parses a node URL. It panics if the URL is not valid.

View File

@ -1,5 +1,15 @@
package raft
type RaftNodeInfo struct {
ClusterSize int `json:"clusterSize"`
Role string `json:"role"`
Address *Address `json:"address"`
PeerAddresses []*Address `json:"peerAddresses"`
RemovedPeerIds []uint16 `json:"removedPeerIds"`
AppliedIndex uint64 `json:"appliedIndex"`
SnapshotIndex uint64 `json:"snapshotIndex"`
}
type PublicRaftAPI struct {
raftService *RaftService
}
@ -9,10 +19,13 @@ func NewPublicRaftAPI(raftService *RaftService) *PublicRaftAPI {
}
func (s *PublicRaftAPI) Role() string {
role := s.raftService.raftProtocolManager.role
if role == minterRole {
return "minter"
} else {
return "verifier"
}
return s.raftService.raftProtocolManager.NodeInfo().Role
}
func (s *PublicRaftAPI) AddPeer(enodeId string) (uint16, error) {
return s.raftService.raftProtocolManager.ProposeNewPeer(enodeId)
}
func (s *PublicRaftAPI) RemovePeer(raftId uint16) {
s.raftService.raftProtocolManager.ProposePeerRemoval(raftId)
}

View File

@ -5,9 +5,9 @@ import (
"time"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
@ -24,6 +24,7 @@ type RaftService struct {
txMu sync.Mutex
txPool *core.TxPool
accountManager *accounts.Manager
downloader *downloader.Downloader
raftProtocolManager *ProtocolManager
startPeers []*discover.Node
@ -33,27 +34,21 @@ type RaftService struct {
minter *minter
}
type RaftNodeInfo struct {
ClusterSize int `json:"clusterSize"`
Genesis common.Hash `json:"genesis"` // SHA3 hash of the host's genesis block
Head common.Hash `json:"head"` // SHA3 hash of the host's best owned block
Role string `json:"role"`
}
func New(ctx *node.ServiceContext, chainConfig *core.ChainConfig, id int, blockTime time.Duration, e *eth.Ethereum, startPeers []*discover.Node, datadir string) (*RaftService, error) {
func New(ctx *node.ServiceContext, chainConfig *core.ChainConfig, raftId uint16, raftPort uint16, joinExisting bool, blockTime time.Duration, e *eth.Ethereum, startPeers []*discover.Node, datadir string) (*RaftService, error) {
service := &RaftService{
eventMux: ctx.EventMux,
chainDb: e.ChainDb(),
blockchain: e.BlockChain(),
txPool: e.TxPool(),
accountManager: e.AccountManager(),
downloader: e.Downloader(),
startPeers: startPeers,
}
service.minter = newMinter(chainConfig, service, blockTime)
var err error
if service.raftProtocolManager, err = NewProtocolManager(id, service.blockchain, service.eventMux, startPeers, datadir, service.minter); err != nil {
if service.raftProtocolManager, err = NewProtocolManager(raftId, raftPort, service.blockchain, service.eventMux, startPeers, joinExisting, datadir, service.minter, service.downloader); err != nil {
return nil, err
}
@ -85,8 +80,8 @@ func (service *RaftService) APIs() []rpc.API {
// Start implements node.Service, starting the background data propagation thread
// of the protocol.
func (service *RaftService) Start(*p2p.Server) error {
service.raftProtocolManager.Start()
func (service *RaftService) Start(p2pServer *p2p.Server) error {
service.raftProtocolManager.Start(p2pServer)
return nil
}
@ -95,6 +90,7 @@ func (service *RaftService) Start(*p2p.Server) error {
func (service *RaftService) Stop() error {
service.blockchain.Stop()
service.raftProtocolManager.Stop()
service.minter.stop()
service.eventMux.Stop()
service.chainDb.Close()

View File

@ -26,6 +26,8 @@ const (
snapshotPeriod = 250
peerUrlKeyPrefix = "peerUrl-"
chainExtensionMessage = "Successfully extended chain"
)
var (

View File

@ -6,8 +6,6 @@ This directory holds an implementation of a [Raft](https://raft.github.io)-based
When the `geth` binary is passed the `--raft` flag, the node will operate in "raft mode."
Currently Raft-based consensus requires that all nodes in the cluster are configured to list the others up-front as [static peers](https://github.com/ethereum/go-ethereum/wiki/Connecting-to-the-network#static-nodes). We will be adding support for dynamic membership changes in the near future.
## Some implementation basics
Note: Though we use the etcd implementation of the Raft protocol, we speak of "Raft" more broadly to refer to the Raft protocol, and its use to achieve consensus for Quorum/Ethereum.
@ -146,6 +144,16 @@ There is currently no limit to the length of these speculative chains, but we pl
We communicate blocks over the HTTP transport layer built in to etcd Raft. It's also (at least theoretically) possible to use the p2p protocol built-in to Ethereum as a transport for Raft. In our testing we found the default etcd HTTP transport to be more reliable than the p2p (at least as implemented in geth) under high load.
Quorum listens on port 50400 by default for the raft transport, but this is configurable with the `--raftport` flag.
## Initial configuration, and enacting membership changes
Currently Raft-based consensus requires that all _initial_ nodes in the cluster are configured to list the others up-front as [static peers](https://github.com/ethereum/go-ethereum/wiki/Connecting-to-the-network#static-nodes). These enode ID URIs _must_ include a `raftport` querystring parameter specifying the raft port for each peer: e.g. `enode://abcd@127.0.0.1:30400?raftport=50400`.
To remove a node from the cluster, attach to a JS console and issue `raft.removePeer(raftId)`, where `raftId` is the number of the node you wish to remove. For initial nodes in the cluster, this number is the 1-indexed position of the node's enode ID in the static peers list. Once a node has been removed from the cluster, it is permanent; this raft ID can not ever re-connect to the cluster in the future, and the party must re-join the cluster with a new raft ID.
To add a node to the cluster, attach to a JS console and issue `raft.addPeer(enodeId)`. Note that like the enode IDs listed in the static peers JSON file, this enode ID should include a `raftport` querystring parameter. This call will allocate and return a raft ID that was not already in use. After `addPeer`, start the new geth node with the flag `--raftjoinexisting RAFTID` in addition to `--raft`.
## FAQ
### Could you have a single- or two-node cluster? More generally, could you have an even number of nodes?

View File

@ -2,7 +2,6 @@ package raft
import (
"fmt"
"math/big"
"net/http"
"net/url"
"os"
@ -31,111 +30,95 @@ import (
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/rafthttp"
"github.com/syndtr/goleveldb/leveldb"
"gopkg.in/fatih/set.v0"
)
// Overview of the channels used in this module:
//
// Node.
// * quitSync: *Every* channel operation can be unblocked by closing this
// channel.
//
// ProtocolManager.
// * proposeC, for proposals flowing from ethereum to raft
// * confChangeC, currently unused; in the future for adding new, non-initial, raft peers
// * roleC, coming from raft notifies us when our role changes
type ProtocolManager struct {
// peers note -- each node tracks the peers acknowledged by raft
//
// only the leader proposes `ConfChangeAddNode` for each peer in the first set
// but not in the second. this is done:
// * when a node becomes leader
// * when the leader learns of new peers
mu sync.RWMutex // For protecting concurrent JS access to "local peer" and "remote peer" state
quitSync chan struct{}
stopped bool
// This node's raft id
id int
// Static configuration
joinExisting bool // Whether to join an existing cluster when a WAL doesn't already exist
bootstrapNodes []*discover.Node
raftId uint16
raftPort uint16
// set of currently active peers known to the raft cluster. this includes self
raftPeers []etcdRaft.Peer
peerUrls []string
p2pNodes []*discover.Node
// Local peer state (protected by mu vs concurrent access via JS)
address *Address
role int // Role: minter or verifier
appliedIndex uint64 // The index of the last-applied raft entry
snapshotIndex uint64 // The index of the latest snapshot.
// Remote peer state (protected by mu vs concurrent access via JS)
peers map[uint16]*Peer
removedPeers *set.Set // *Permanently removed* peers
// P2P transport
p2pServer *p2p.Server // Initialized in start()
// Blockchain services
blockchain *core.BlockChain
downloader *downloader.Downloader
minter *minter
// to protect the raft peers and addresses
mu sync.RWMutex
// Blockchain events
eventMux *event.TypeMux
minedBlockSub event.Subscription
downloader *downloader.Downloader
peerGetter func() (string, *big.Int)
// Raft proposal events
blockProposalC chan *types.Block // for mined blocks to raft
confChangeProposalC chan raftpb.ConfChange // for config changes from js console to raft
rawNode etcdRaft.Node
raftStorage *etcdRaft.MemoryStorage
// Raft transport
unsafeRawNode etcdRaft.Node
transport *rafthttp.Transport
httpstopc chan struct{}
httpdonec chan struct{}
transport *rafthttp.Transport
httpstopc chan struct{}
httpdonec chan struct{}
// The number of entries applied to the raft log
appliedIndex uint64
// The index of the latest snapshot.
snapshotIndex uint64
// Snapshotting
// Raft snapshotting
snapshotter *snap.Snapshotter
snapdir string
confState raftpb.ConfState
// write-ahead log
// Raft write-ahead log
waldir string
wal *wal.WAL
// Persistence outside of the blockchain and raft log to keep track of our
// last-applied raft index and raft peer URLs.
quorumRaftDb *leveldb.DB
proposeC chan *types.Block
confChangeC chan raftpb.ConfChange
quitSync chan struct{}
// Note: we don't actually use this field. We just set it at the same time as
// starting or stopping the miner in notifyRoleChange. We might want to remove
// it, but it might also be useful to check.
role int
minter *minter
// Storage
quorumRaftDb *leveldb.DB // Persistent storage for last-applied raft index
raftStorage *etcdRaft.MemoryStorage // Volatile raft storage
}
//
// Public interface
//
func NewProtocolManager(id int, blockchain *core.BlockChain, mux *event.TypeMux, peers []*discover.Node, datadir string, minter *minter) (*ProtocolManager, error) {
func NewProtocolManager(raftId uint16, raftPort uint16, blockchain *core.BlockChain, mux *event.TypeMux, bootstrapNodes []*discover.Node, joinExisting bool, datadir string, minter *minter, downloader *downloader.Downloader) (*ProtocolManager, error) {
waldir := fmt.Sprintf("%s/raft-wal", datadir)
snapdir := fmt.Sprintf("%s/raft-snap", datadir)
quorumRaftDbLoc := fmt.Sprintf("%s/quorum-raft-state", datadir)
peerUrls := makePeerUrls(peers)
manager := &ProtocolManager{
raftPeers: makeRaftPeers(peerUrls),
peerUrls: peerUrls,
p2pNodes: peers,
blockchain: blockchain,
eventMux: mux,
proposeC: make(chan *types.Block),
confChangeC: make(chan raftpb.ConfChange),
httpstopc: make(chan struct{}),
httpdonec: make(chan struct{}),
waldir: waldir,
snapdir: snapdir,
snapshotter: snap.New(snapdir),
id: id,
quitSync: make(chan struct{}),
raftStorage: etcdRaft.NewMemoryStorage(),
minter: minter,
bootstrapNodes: bootstrapNodes,
peers: make(map[uint16]*Peer),
removedPeers: set.New(),
joinExisting: joinExisting,
blockchain: blockchain,
eventMux: mux,
blockProposalC: make(chan *types.Block),
confChangeProposalC: make(chan raftpb.ConfChange),
httpstopc: make(chan struct{}),
httpdonec: make(chan struct{}),
waldir: waldir,
snapdir: snapdir,
snapshotter: snap.New(snapdir),
raftId: raftId,
raftPort: raftPort,
quitSync: make(chan struct{}),
raftStorage: etcdRaft.NewMemoryStorage(),
minter: minter,
downloader: downloader,
}
if db, err := openQuorumRaftDb(quorumRaftDbLoc); err != nil {
@ -147,36 +130,56 @@ func NewProtocolManager(id int, blockchain *core.BlockChain, mux *event.TypeMux,
return manager, nil
}
func (pm *ProtocolManager) Start() {
func (pm *ProtocolManager) Start(p2pServer *p2p.Server) {
glog.V(logger.Info).Infoln("starting raft protocol handler")
pm.p2pServer = p2pServer
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop(pm.proposeC)
pm.startRaftNode()
pm.startRaft()
go pm.minedBroadcastLoop()
}
func (pm *ProtocolManager) Stop() {
pm.mu.Lock()
defer pm.mu.Unlock()
defer glog.V(logger.Info).Infoln("raft protocol handler stopped")
if pm.stopped {
return
}
glog.V(logger.Info).Infoln("stopping raft protocol handler...")
for raftId, peer := range pm.peers {
pm.disconnectFromPeer(raftId, peer)
}
pm.minedBlockSub.Unsubscribe()
pm.transport.Stop()
if pm.transport != nil {
pm.transport.Stop()
}
close(pm.httpstopc)
<-pm.httpdonec
close(pm.quitSync)
if pm.rawNode != nil {
pm.rawNode.Stop()
if pm.unsafeRawNode != nil {
pm.unsafeRawNode.Stop()
}
pm.quorumRaftDb.Close()
pm.p2pServer = nil
pm.minter.stop()
glog.V(logger.Info).Infoln("raft protocol handler stopped")
pm.stopped = true
}
func (pm *ProtocolManager) NodeInfo() *RaftNodeInfo {
pm.mu.RLock() // as we read pm.role
pm.mu.RLock() // as we read role and peers
defer pm.mu.RUnlock()
var roleDescription string
@ -186,11 +189,136 @@ func (pm *ProtocolManager) NodeInfo() *RaftNodeInfo {
roleDescription = "verifier"
}
peerAddresses := make([]*Address, len(pm.peers))
peerIdx := 0
for _, peer := range pm.peers {
peerAddresses[peerIdx] = peer.address
peerIdx += 1
}
removedPeerIfaces := pm.removedPeers.List()
removedPeerIds := make([]uint16, len(removedPeerIfaces))
for i, removedIface := range removedPeerIfaces {
removedPeerIds[i] = removedIface.(uint16)
}
//
// NOTE: before exposing any new fields here, make sure that the underlying
// ProtocolManager members are protected from concurrent access by pm.mu!
//
return &RaftNodeInfo{
ClusterSize: len(pm.raftPeers),
Genesis: pm.blockchain.Genesis().Hash(),
Head: pm.blockchain.CurrentBlock().Hash(),
Role: roleDescription,
ClusterSize: len(pm.peers) + 1,
Role: roleDescription,
Address: pm.address,
PeerAddresses: peerAddresses,
RemovedPeerIds: removedPeerIds,
AppliedIndex: pm.appliedIndex,
SnapshotIndex: pm.snapshotIndex,
}
}
// There seems to be a very rare race in raft where during `etcdRaft.StartNode`
// it will call back our `Process` method before it's finished returning the
// `raft.Node`, `pm.unsafeRawNode`, to us. This re-entrance through a separate
// thread will cause a nil pointer dereference. To work around this, this
// getter method should be used instead of reading `pm.unsafeRawNode` directly.
func (pm *ProtocolManager) rawNode() etcdRaft.Node {
for pm.unsafeRawNode == nil {
time.Sleep(100 * time.Millisecond)
}
return pm.unsafeRawNode
}
func (pm *ProtocolManager) nextRaftId() uint16 {
pm.mu.RLock()
defer pm.mu.RUnlock()
maxId := pm.raftId
for peerId := range pm.peers {
if maxId < peerId {
maxId = peerId
}
}
removedPeerIfaces := pm.removedPeers.List()
for _, removedIface := range removedPeerIfaces {
removedId := removedIface.(uint16)
if maxId < removedId {
maxId = removedId
}
}
return maxId + 1
}
func (pm *ProtocolManager) isRaftIdRemoved(id uint16) bool {
pm.mu.RLock()
defer pm.mu.RUnlock()
return pm.removedPeers.Has(id)
}
func (pm *ProtocolManager) isRaftIdUsed(raftId uint16) bool {
if pm.raftId == raftId || pm.isRaftIdRemoved(raftId) {
return true
}
pm.mu.RLock()
defer pm.mu.RUnlock()
return pm.peers[raftId] != nil
}
func (pm *ProtocolManager) isP2pNodeInCluster(node *discover.Node) bool {
pm.mu.RLock()
defer pm.mu.RUnlock()
for _, peer := range pm.peers {
if peer.p2pNode.ID == node.ID {
return true
}
}
return false
}
func (pm *ProtocolManager) ProposeNewPeer(enodeId string) (uint16, error) {
node, err := discover.ParseNode(enodeId)
if err != nil {
return 0, err
}
if pm.isP2pNodeInCluster(node) {
return 0, fmt.Errorf("node is already in the cluster: %v", enodeId)
}
if len(node.IP) != 4 {
return 0, fmt.Errorf("expected IPv4 address (with length 4), but got IP of length %v", len(node.IP))
}
if !node.HasRaftPort() {
return 0, fmt.Errorf("enodeId is missing raftport querystring parameter: %v", enodeId)
}
raftId := pm.nextRaftId()
address := newAddress(raftId, node.RaftPort, node)
pm.confChangeProposalC <- raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: uint64(raftId),
Context: address.toBytes(),
}
return raftId, nil
}
func (pm *ProtocolManager) ProposePeerRemoval(raftId uint16) {
pm.confChangeProposalC <- raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
NodeID: uint64(raftId),
}
}
@ -203,7 +331,7 @@ func (pm *ProtocolManager) WriteMsg(msg p2p.Msg) error {
var buffer = make([]byte, msg.Size)
msg.Payload.Read(buffer)
return pm.rawNode.Propose(context.TODO(), buffer)
return pm.rawNode().Propose(context.TODO(), buffer)
}
//
@ -211,51 +339,86 @@ func (pm *ProtocolManager) WriteMsg(msg p2p.Msg) error {
//
func (pm *ProtocolManager) Process(ctx context.Context, m raftpb.Message) error {
return pm.rawNode.Step(ctx, m)
return pm.rawNode().Step(ctx, m)
}
func (pm *ProtocolManager) IsIDRemoved(id uint64) bool {
// TODO: implement this in the future once we support dynamic cluster membership
glog.V(logger.Info).Infof("reporting that raft ID %d is not removed", id)
return false
return pm.isRaftIdRemoved(uint16(id))
}
func (pm *ProtocolManager) ReportUnreachable(id uint64) {
glog.V(logger.Warn).Infof("peer %d is currently unreachable", id)
pm.rawNode.ReportUnreachable(id)
pm.rawNode().ReportUnreachable(id)
}
func (pm *ProtocolManager) ReportSnapshot(id uint64, status etcdRaft.SnapshotStatus) {
glog.V(logger.Info).Infof("status of last-sent snapshot: %v", status)
pm.rawNode.ReportSnapshot(id, status)
if status == etcdRaft.SnapshotFailure {
glog.V(logger.Info).Infof("failed to send snapshot to raft peer %v", id)
} else if status == etcdRaft.SnapshotFinish {
glog.V(logger.Info).Infof("finished sending snapshot to raft peer %v", id)
}
pm.rawNode().ReportSnapshot(id, status)
}
//
// Private methods
//
func (pm *ProtocolManager) startRaftNode() {
func (pm *ProtocolManager) startRaft() {
if !fileutil.Exist(pm.snapdir) {
if err := os.Mkdir(pm.snapdir, 0750); err != nil {
glog.Fatalf("cannot create dir for snapshot (%v)", err)
}
}
walExisted := wal.Exist(pm.waldir)
lastAppliedIndex := pm.loadAppliedIndex()
pm.wal = pm.replayWAL()
ss := &stats.ServerStats{}
ss.Initialize()
pm.transport = &rafthttp.Transport{
ID: raftTypes.ID(pm.raftId),
ClusterID: 0x1000,
Raft: pm,
ServerStats: ss,
LeaderStats: stats.NewLeaderStats(strconv.Itoa(int(pm.raftId))),
ErrorC: make(chan error),
}
pm.transport.Start()
// We load the snapshot to connect to prev peers before replaying the WAL,
// which typically goes further into the future than the snapshot.
var maybeRaftSnapshot *raftpb.Snapshot
if walExisted {
maybeRaftSnapshot = pm.loadSnapshot() // re-establishes peer connections
}
pm.wal = pm.replayWAL(maybeRaftSnapshot)
if walExisted {
if hardState, _, err := pm.raftStorage.InitialState(); err != nil {
panic(fmt.Sprintf("failed to read initial state from raft while restarting: %v", err))
} else {
if lastPersistedCommittedIndex := hardState.Commit; lastPersistedCommittedIndex < lastAppliedIndex {
glog.V(logger.Warn).Infof("rolling back applied index from %v to last-durably-committed %v", lastAppliedIndex, lastPersistedCommittedIndex)
// Roll back our applied index. See the logic and explanation around
// the single call to `pm.applyNewChainHead` for more context.
lastAppliedIndex = lastPersistedCommittedIndex
}
}
}
// NOTE: cockroach sets this to false for now until they've "worked out the
// bugs"
enablePreVote := true
lastAppliedIndex := pm.loadAppliedIndex()
c := &etcdRaft.Config{
raftConfig := &etcdRaft.Config{
Applied: lastAppliedIndex,
ID: uint64(pm.id),
ID: uint64(pm.raftId),
ElectionTick: 10, // NOTE: cockroach sets this to 15
HeartbeatTick: 1, // NOTE: cockroach sets this to 5
Storage: pm.raftStorage,
@ -283,44 +446,59 @@ func (pm *ProtocolManager) startRaftNode() {
MaxInflightMsgs: 256, // NOTE: in cockroachdb this is 4
}
glog.V(logger.Info).Infof("local raft ID is %v", c.ID)
ss := &stats.ServerStats{}
ss.Initialize()
pm.transport = &rafthttp.Transport{
ID: raftTypes.ID(pm.id),
ClusterID: 0x1000,
Raft: pm,
ServerStats: ss,
LeaderStats: stats.NewLeaderStats(strconv.Itoa(pm.id)),
ErrorC: make(chan error),
}
pm.transport.Start()
glog.V(logger.Info).Infof("local raft ID is %v", raftConfig.ID)
if walExisted {
pm.reconnectToPreviousPeers()
pm.rawNode = etcdRaft.RestartNode(c)
glog.V(logger.Info).Infof("remounting an existing raft log; connecting to peers.")
pm.unsafeRawNode = etcdRaft.RestartNode(raftConfig)
} else if pm.joinExisting {
glog.V(logger.Info).Infof("newly joining an existing cluster; waiting for connections.")
pm.unsafeRawNode = etcdRaft.StartNode(raftConfig, nil)
} else {
if numPeers := len(pm.raftPeers); numPeers == 0 {
if numPeers := len(pm.bootstrapNodes); numPeers == 0 {
panic("exiting due to empty raft peers list")
} else {
glog.V(logger.Info).Infof("starting raft with %v total peers.", numPeers)
glog.V(logger.Info).Infof("starting a new raft log with an initial cluster size of %v.", numPeers)
}
pm.rawNode = etcdRaft.StartNode(c, pm.raftPeers)
raftPeers, peerAddresses, localAddress := pm.makeInitialRaftPeers()
pm.setLocalAddress(localAddress)
// We add all peers up-front even though we will see a ConfChangeAddNode
// for each shortly. This is because raft's ConfState will contain all of
// these nodes before we see these log entries, and we always want our
// snapshots to have all addresses for each of the nodes in the ConfState.
for _, peerAddress := range peerAddresses {
pm.addPeer(peerAddress)
}
pm.unsafeRawNode = etcdRaft.StartNode(raftConfig, raftPeers)
}
go pm.serveRaft()
go pm.serveInternal(pm.proposeC, pm.confChangeC)
go pm.serveLocalProposals()
go pm.eventLoop()
go pm.handleRoleChange(pm.rawNode.RoleChan().Out())
go pm.handleRoleChange(pm.rawNode().RoleChan().Out())
}
func (pm *ProtocolManager) setLocalAddress(addr *Address) {
pm.mu.Lock()
pm.address = addr
pm.mu.Unlock()
// By setting `URLs` on the raft transport, we advertise our URL (in an HTTP
// header) to any recipient. This is necessary for a newcomer to the cluster
// to be able to accept a snapshot from us to bootstrap them.
if urls, err := raftTypes.NewURLs([]string{raftUrl(addr)}); err == nil {
pm.transport.URLs = urls
} else {
panic(fmt.Sprintf("error: could not create URL from local address: %v", addr))
}
}
func (pm *ProtocolManager) serveRaft() {
urlString := fmt.Sprintf("http://0.0.0.0:%d", nodeHttpPort(pm.p2pNodes[pm.id-1]))
urlString := fmt.Sprintf("http://0.0.0.0:%d", pm.raftPort)
url, err := url.Parse(urlString)
if err != nil {
glog.Fatalf("Failed parsing URL (%v)", err)
@ -367,12 +545,12 @@ func (pm *ProtocolManager) handleRoleChange(roleC <-chan interface{}) {
}
}
func (pm *ProtocolManager) minedBroadcastLoop(proposeC chan<- *types.Block) {
func (pm *ProtocolManager) minedBroadcastLoop() {
for obj := range pm.minedBlockSub.Chan() {
switch ev := obj.Data.(type) {
case core.NewMinedBlockEvent:
select {
case proposeC <- ev.Block:
case pm.blockProposalC <- ev.Block:
case <-pm.quitSync:
return
}
@ -380,9 +558,8 @@ func (pm *ProtocolManager) minedBroadcastLoop(proposeC chan<- *types.Block) {
}
}
// serve two channels (proposeC, confChangeC) to handle changes originating
// internally
func (pm *ProtocolManager) serveInternal(proposeC <-chan *types.Block, confChangeC <-chan raftpb.ConfChange) {
// Serve two channels to handle new blocks and raft configuration changes originating locally.
func (pm *ProtocolManager) serveLocalProposals() {
//
// TODO: does it matter that this will restart from 0 whenever we restart a cluster?
//
@ -390,7 +567,7 @@ func (pm *ProtocolManager) serveInternal(proposeC <-chan *types.Block, confChang
for {
select {
case block, ok := <-proposeC:
case block, ok := <-pm.blockProposalC:
if !ok {
glog.V(logger.Info).Infoln("error: read from proposeC failed")
return
@ -404,8 +581,8 @@ func (pm *ProtocolManager) serveInternal(proposeC <-chan *types.Block, confChang
r.Read(buffer)
// blocks until accepted by the raft state machine
pm.rawNode.Propose(context.TODO(), buffer)
case cc, ok := <-confChangeC:
pm.rawNode().Propose(context.TODO(), buffer)
case cc, ok := <-pm.confChangeProposalC:
if !ok {
glog.V(logger.Info).Infoln("error: read from confChangeC failed")
return
@ -413,20 +590,22 @@ func (pm *ProtocolManager) serveInternal(proposeC <-chan *types.Block, confChang
confChangeCount++
cc.ID = confChangeCount
pm.rawNode.ProposeConfChange(context.TODO(), cc)
pm.rawNode().ProposeConfChange(context.TODO(), cc)
case <-pm.quitSync:
return
}
}
}
func (pm *ProtocolManager) entriesToApply(ents []raftpb.Entry) (nents []raftpb.Entry) {
if len(ents) == 0 {
func (pm *ProtocolManager) entriesToApply(allEntries []raftpb.Entry) (entriesToApply []raftpb.Entry) {
if len(allEntries) == 0 {
return
}
first := ents[0].Index
first := allEntries[0].Index
pm.mu.RLock()
lastApplied := pm.appliedIndex
pm.mu.RUnlock()
if first > lastApplied+1 {
glog.Fatalf("first index of committed entry[%d] should <= appliedIndex[%d] + 1", first, lastApplied)
@ -434,30 +613,53 @@ func (pm *ProtocolManager) entriesToApply(ents []raftpb.Entry) (nents []raftpb.E
firstToApply := lastApplied - first + 1
if firstToApply < uint64(len(ents)) {
nents = ents[firstToApply:]
if firstToApply < uint64(len(allEntries)) {
entriesToApply = allEntries[firstToApply:]
}
return
}
func (pm *ProtocolManager) addPeer(nodeId uint64, peerUrl string) {
pm.transport.AddPeer(raftTypes.ID(nodeId), []string{peerUrl})
func raftUrl(address *Address) string {
return fmt.Sprintf("http://%s:%d", address.ip, address.raftPort)
}
func (pm *ProtocolManager) removePeer(nodeId uint64) {
pm.transport.RemovePeer(raftTypes.ID(nodeId))
func (pm *ProtocolManager) addPeer(address *Address) {
pm.mu.Lock()
defer pm.mu.Unlock()
raftId := address.raftId
// Add P2P connection:
p2pNode := discover.NewNode(address.nodeId, address.ip, 0, uint16(address.p2pPort))
pm.p2pServer.AddPeer(p2pNode)
// Add raft transport connection:
pm.transport.AddPeer(raftTypes.ID(raftId), []string{raftUrl(address)})
pm.peers[raftId] = &Peer{address, p2pNode}
}
func (pm *ProtocolManager) reconnectToPreviousPeers() {
_, confState, _ := pm.raftStorage.InitialState()
func (pm *ProtocolManager) disconnectFromPeer(raftId uint16, peer *Peer) {
pm.p2pServer.RemovePeer(peer.p2pNode)
pm.transport.RemovePeer(raftTypes.ID(raftId))
}
for _, nodeId := range confState.Nodes {
peerUrl := pm.loadPeerUrl(nodeId)
func (pm *ProtocolManager) removePeer(raftId uint16) {
pm.mu.Lock()
defer pm.mu.Unlock()
if nodeId != uint64(pm.id) {
pm.addPeer(nodeId, peerUrl)
}
if peer := pm.peers[raftId]; peer != nil {
pm.disconnectFromPeer(raftId, peer)
delete(pm.peers, raftId)
}
// This is only necessary sometimes, but it's idempotent. Also, we *always*
// do this, and not just when there's still a peer in the map, because we
// need to do it for our *own* raft ID before we get booted from the cluster
// so that snapshots are identical on all nodes. It's important for a booted
// node to have a snapshot identical to every other node because that node
// can potentially re-enter the cluster with a new raft ID.
pm.removedPeers.Add(raftId)
}
func (pm *ProtocolManager) eventLoop() {
@ -465,19 +667,22 @@ func (pm *ProtocolManager) eventLoop() {
defer ticker.Stop()
defer pm.wal.Close()
exitAfterApplying := false
for {
select {
case <-ticker.C:
pm.rawNode.Tick()
pm.rawNode().Tick()
// when the node is first ready it gives us entries to commit and messages
// to immediately publish
case rd := <-pm.rawNode.Ready():
case rd := <-pm.rawNode().Ready():
pm.wal.Save(rd.HardState, rd.Entries)
if snap := rd.Snapshot; !etcdRaft.IsEmptySnap(snap) {
pm.saveSnapshot(snap)
pm.applySnapshot(snap)
pm.saveRaftSnapshot(snap)
pm.applyRaftSnapshot(snap)
pm.advanceAppliedIndex(snap.Metadata.Index)
}
// 1: Write HardState, Entries, and Snapshot to persistent storage if they
@ -499,67 +704,102 @@ func (pm *ProtocolManager) eventLoop() {
if err != nil {
glog.V(logger.Error).Infoln("error decoding block: ", err)
}
pm.applyNewChainHead(&block)
if pm.blockchain.HasBlock(block.Hash()) {
// This can happen:
//
// if (1) we crashed after applying this block to the chain, but
// before writing appliedIndex to LDB.
// or (2) we crashed in a scenario where we applied further than
// raft *durably persisted* its committed index (see
// https://github.com/coreos/etcd/pull/7899). In this
// scenario, when the node comes back up, we will re-apply
// a few entries.
headBlockHash := pm.blockchain.CurrentBlock().Hash()
glog.V(logger.Warn).Infof("not applying already-applied block: %x (parent is %x; current head is %x)\n", block.Hash(), block.ParentHash(), headBlockHash)
} else {
pm.applyNewChainHead(&block)
}
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
cc.Unmarshal(entry.Data)
raftId := uint16(cc.NodeID)
// We lock access to this, in case we want to read the list of
// cluster members concurrently via RPC (e.g. from NodeInfo()):
pm.mu.Lock()
pm.confState = *pm.rawNode.ApplyConfChange(cc)
pm.mu.Unlock()
pm.confState = *pm.rawNode().ApplyConfChange(cc)
forceSnapshot := false
switch cc.Type {
case raftpb.ConfChangeAddNode:
glog.V(logger.Info).Infof("adding peer %v due to ConfChangeAddNode", cc.NodeID)
if pm.isRaftIdRemoved(raftId) {
glog.V(logger.Info).Infof("ignoring ConfChangeAddNode for permanently-removed peer %v", raftId)
} else if raftId <= uint16(len(pm.bootstrapNodes)) {
// See initial cluster logic in startRaft() for more information.
glog.V(logger.Info).Infof("ignoring expected ConfChangeAddNode for initial peer %v", raftId)
nodeId := cc.NodeID
peerUrl := string(cc.Context)
// We need a snapshot to exist to reconnect to peers on start-up after a crash.
forceSnapshot = true
} else if pm.isRaftIdUsed(raftId) {
glog.V(logger.Info).Infof("ignoring ConfChangeAddNode for already-used raft ID %v", raftId)
} else {
glog.V(logger.Info).Infof("adding peer %v due to ConfChangeAddNode", raftId)
if nodeId != uint64(pm.id) {
pm.addPeer(nodeId, peerUrl)
forceSnapshot = true
pm.addPeer(bytesToAddress(cc.Context))
}
pm.writePeerUrl(nodeId, peerUrl)
case raftpb.ConfChangeRemoveNode:
glog.V(logger.Info).Infof("removing peer %v due to ConfChangeRemoveNode", cc.NodeID)
if pm.isRaftIdRemoved(raftId) {
glog.V(logger.Info).Infof("ignoring ConfChangeRemoveNode for already-removed peer %v", raftId)
} else {
glog.V(logger.Info).Infof("removing peer %v due to ConfChangeRemoveNode", raftId)
if cc.NodeID == uint64(pm.id) {
glog.V(logger.Warn).Infoln("removing self from the cluster due to ConfChangeRemoveNode")
forceSnapshot = true
pm.advanceAppliedIndex(entry.Index)
if raftId == pm.raftId {
exitAfterApplying = true
}
// TODO: we might want to completely exit(0) geth here
return
pm.removePeer(raftId)
}
pm.removePeer(cc.NodeID)
case raftpb.ConfChangeUpdateNode:
// NOTE: remember to forceSnapshot in this case, if we add support
// for this.
glog.Fatalln("not yet handled: ConfChangeUpdateNode")
}
// We force a snapshot here to persist our updated confState, so we
// know our fellow cluster members when we come back online.
//
// It is critical here to snapshot *before* writing our applied
// index in LevelDB, otherwise a crash while/before snapshotting
// (after advancing our applied index) would result in the loss of a
// cluster member upon restart: we would re-mount with an old
// ConfState.
pm.triggerSnapshotWithNextIndex(entry.Index)
if forceSnapshot {
// We force a snapshot here to persist our updated confState, so we
// know our fellow cluster members when we come back online.
//
// It is critical here to snapshot *before* writing our applied
// index in LevelDB, otherwise a crash while/before snapshotting
// (after advancing our applied index) would result in the loss of a
// cluster member upon restart: we would re-mount with an old
// ConfState.
pm.triggerSnapshot(entry.Index)
}
}
pm.advanceAppliedIndex(entry.Index)
}
pm.maybeTriggerSnapshot()
if exitAfterApplying {
glog.V(logger.Warn).Infoln("permanently removing self from the cluster")
pm.Stop()
glog.V(logger.Warn).Infoln("permanently exited the cluster")
return
}
// 4: Call Node.Advance() to signal readiness for the next batch of
// updates.
pm.maybeTriggerSnapshot()
pm.rawNode.Advance()
pm.rawNode().Advance()
case <-pm.quitSync:
return
@ -567,38 +807,33 @@ func (pm *ProtocolManager) eventLoop() {
}
}
func makeRaftPeers(urls []string) []etcdRaft.Peer {
peers := make([]etcdRaft.Peer, len(urls))
for i, url := range urls {
peerId := i + 1
func (pm *ProtocolManager) makeInitialRaftPeers() (raftPeers []etcdRaft.Peer, peerAddresses []*Address, localAddress *Address) {
initialNodes := pm.bootstrapNodes
raftPeers = make([]etcdRaft.Peer, len(initialNodes)) // Entire cluster
peerAddresses = make([]*Address, len(initialNodes)-1) // Cluster without *this* node
peers[i] = etcdRaft.Peer{
ID: uint64(peerId),
Context: []byte(url),
peersSeen := 0
for i, node := range initialNodes {
raftId := uint16(i + 1)
// We initially get the raftPort from the enode ID's query string. As an alternative, we can move away from
// requiring the use of static peers for the initial set, and load them from e.g. another JSON file which
// contains pairs of enodes and raft ports, or we can get this initial peer list from commandline flags.
address := newAddress(raftId, node.RaftPort, node)
raftPeers[i] = etcdRaft.Peer{
ID: uint64(raftId),
Context: address.toBytes(),
}
if raftId == pm.raftId {
localAddress = address
} else {
peerAddresses[peersSeen] = address
peersSeen += 1
}
}
return peers
}
func nodeHttpPort(node *discover.Node) uint16 {
//
// TODO: we should probably read this from the commandline, but it's a little trickier because we wouldn't be
// accepting a single port like with --port or --rpcport; we'd have to ask for a base HTTP port (e.g. 50400)
// with the convention/understanding that the port used by each node would be base + raft ID, which quorum is
// otherwise not aware of.
//
return 20000 + node.TCP
}
func makePeerUrls(nodes []*discover.Node) []string {
urls := make([]string, len(nodes))
for i, node := range nodes {
ip := node.IP.String()
port := nodeHttpPort(node)
urls[i] = fmt.Sprintf("http://%s:%d", ip, port)
}
return urls
return
}
func sleep(duration time.Duration) {
@ -633,13 +868,15 @@ func (pm *ProtocolManager) applyNewChainHead(block *types.Block) {
panic(fmt.Sprintf("failed to extend chain: %s", err.Error()))
}
glog.V(logger.Info).Infof("Successfully extended chain: %x\n", block.Hash())
glog.V(logger.Info).Infof("%s: %x\n", chainExtensionMessage, block.Hash())
}
}
// Sets new appliedIndex in-memory, *and* writes this appliedIndex to LevelDB.
func (pm *ProtocolManager) advanceAppliedIndex(index uint64) {
pm.appliedIndex = index
pm.writeAppliedIndex(index)
pm.mu.Lock()
pm.appliedIndex = index
pm.mu.Unlock()
}

80
raft/peer.go Normal file
View File

@ -0,0 +1,80 @@
package raft
import (
"io"
"net"
"fmt"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/rlp"
"log"
)
// Serializable information about a Peer. Sufficient to build `etcdRaft.Peer`
// or `discover.Node`.
type Address struct {
raftId uint16
nodeId discover.NodeID
ip net.IP
p2pPort uint16
raftPort uint16
}
func newAddress(raftId uint16, raftPort uint16, node *discover.Node) *Address {
return &Address{
raftId: raftId,
nodeId: node.ID,
ip: node.IP,
p2pPort: node.TCP,
raftPort: raftPort,
}
}
// A peer that we're connected to via both raft's http transport, and ethereum p2p
type Peer struct {
address *Address // For raft transport
p2pNode *discover.Node // For ethereum transport
}
func (addr *Address) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, []interface{}{addr.raftId, addr.nodeId, addr.ip, addr.p2pPort, addr.raftPort})
}
func (addr *Address) DecodeRLP(s *rlp.Stream) error {
// These fields need to be public:
var temp struct {
RaftId uint16
NodeId discover.NodeID
Ip net.IP
P2pPort uint16
RaftPort uint16
}
if err := s.Decode(&temp); err != nil {
return err
} else {
addr.raftId, addr.nodeId, addr.ip, addr.p2pPort, addr.raftPort = temp.RaftId, temp.NodeId, temp.Ip, temp.P2pPort, temp.RaftPort
return nil
}
}
// RLP Address encoding, for transport over raft and storage in LevelDB.
func (addr *Address) toBytes() []byte {
size, r, err := rlp.EncodeToReader(addr)
if err != nil {
panic(fmt.Sprintf("error: failed to RLP-encode Address: %s", err.Error()))
}
var buffer = make([]byte, uint32(size))
r.Read(buffer)
return buffer
}
func bytesToAddress(bytes []byte) *Address {
var addr Address
if err := rlp.DecodeBytes(bytes, &addr); err != nil {
log.Fatalf("failed to RLP-decode Address: %v", err)
}
return &addr
}

View File

@ -16,11 +16,6 @@ var (
NoWriteMerge: false,
Sync: false,
}
mustFsync = &opt.WriteOptions{
NoWriteMerge: false,
Sync: true,
}
)
func openQuorumRaftDb(path string) (db *leveldb.DB, err error) {
@ -46,30 +41,18 @@ func (pm *ProtocolManager) loadAppliedIndex() uint64 {
lastAppliedIndex = binary.LittleEndian.Uint64(dat)
}
glog.V(logger.Info).Infof("Persistent applied index load: %d", lastAppliedIndex)
pm.mu.Lock()
pm.appliedIndex = lastAppliedIndex
pm.mu.Unlock()
glog.V(logger.Info).Infof("loaded the latest applied index: %d", lastAppliedIndex)
return lastAppliedIndex
}
func (pm *ProtocolManager) writeAppliedIndex(index uint64) {
glog.V(logger.Info).Infof("Persistent applied index write: %d", index)
glog.V(logger.Info).Infof("persisted the latest applied index: %d", index)
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, index)
pm.quorumRaftDb.Put(appliedDbKey, buf, noFsync)
}
func (pm *ProtocolManager) loadPeerUrl(nodeId uint64) string {
peerUrlKey := []byte(peerUrlKeyPrefix + string(nodeId))
value, err := pm.quorumRaftDb.Get(peerUrlKey, nil)
if err != nil {
glog.Fatalf("failed to read peer url for peer %d from leveldb: %v", nodeId, err)
}
return string(value)
}
func (pm *ProtocolManager) writePeerUrl(nodeId uint64, url string) {
key := []byte(peerUrlKeyPrefix + string(nodeId))
value := []byte(url)
pm.quorumRaftDb.Put(key, value, mustFsync)
}

View File

@ -1,14 +1,237 @@
package raft
import (
"fmt"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/wal/walpb"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/rlp"
"gopkg.in/fatih/set.v0"
"io"
"log"
"math/big"
"sort"
"time"
)
func (pm *ProtocolManager) saveSnapshot(snap raftpb.Snapshot) error {
// Snapshot
type Snapshot struct {
addresses []Address
removedRaftIds []uint16 // Raft IDs for permanently removed peers
headBlockHash common.Hash
}
type ByRaftId []Address
func (a ByRaftId) Len() int { return len(a) }
func (a ByRaftId) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByRaftId) Less(i, j int) bool { return a[i].raftId < a[j].raftId }
func (pm *ProtocolManager) buildSnapshot() *Snapshot {
pm.mu.RLock()
defer pm.mu.RUnlock()
numNodes := len(pm.confState.Nodes)
numRemovedNodes := pm.removedPeers.Size()
snapshot := &Snapshot{
addresses: make([]Address, numNodes),
removedRaftIds: make([]uint16, numRemovedNodes),
headBlockHash: pm.blockchain.CurrentBlock().Hash(),
}
// Populate addresses
for i, rawRaftId := range pm.confState.Nodes {
raftId := uint16(rawRaftId)
if raftId == pm.raftId {
snapshot.addresses[i] = *pm.address
} else {
snapshot.addresses[i] = *pm.peers[raftId].address
}
}
sort.Sort(ByRaftId(snapshot.addresses))
// Populate removed IDs
for i, removedIface := range pm.removedPeers.List() {
snapshot.removedRaftIds[i] = removedIface.(uint16)
}
return snapshot
}
// Note that we do *not* read `pm.appliedIndex` here. We only use the `index`
// parameter instead. This is because we need to support a scenario when we
// snapshot for a future index that we have not yet recorded in LevelDB. See
// comments around the use of `forceSnapshot`.
func (pm *ProtocolManager) triggerSnapshot(index uint64) {
pm.mu.RLock()
snapshotIndex := pm.snapshotIndex
pm.mu.RUnlock()
glog.V(logger.Info).Infof("start snapshot [applied index: %d | last snapshot index: %d]", index, snapshotIndex)
snapData := pm.buildSnapshot().toBytes()
snap, err := pm.raftStorage.CreateSnapshot(index, &pm.confState, snapData)
if err != nil {
panic(err)
}
if err := pm.saveRaftSnapshot(snap); err != nil {
panic(err)
}
// Discard all log entries prior to index.
if err := pm.raftStorage.Compact(index); err != nil {
panic(err)
}
glog.V(logger.Info).Infof("compacted log at index %d", index)
pm.mu.Lock()
pm.snapshotIndex = index
pm.mu.Unlock()
}
func confStateIdSet(confState raftpb.ConfState) *set.Set {
set := set.New()
for _, rawRaftId := range confState.Nodes {
set.Add(uint16(rawRaftId))
}
return set
}
func (pm *ProtocolManager) updateClusterMembership(newConfState raftpb.ConfState, addresses []Address, removedRaftIds []uint16) {
glog.V(logger.Info).Infof("updating cluster membership per raft snapshot")
prevConfState := pm.confState
// Update tombstones for permanently removed peers. For simplicity we do not
// allow the re-use of peer IDs once a peer is removed.
removedPeers := set.New()
for _, removedRaftId := range removedRaftIds {
removedPeers.Add(removedRaftId)
}
pm.mu.Lock()
pm.removedPeers = removedPeers
pm.mu.Unlock()
// Remove old peers that we're still connected to
prevIds := confStateIdSet(prevConfState)
newIds := confStateIdSet(newConfState)
idsToRemove := set.Difference(prevIds, newIds)
for _, idIfaceToRemove := range idsToRemove.List() {
raftId := idIfaceToRemove.(uint16)
glog.V(logger.Info).Infof("removing old raft peer %v", raftId)
pm.removePeer(raftId)
}
// Update local and remote addresses
for _, tempAddress := range addresses {
address := tempAddress // Allocate separately on the heap for each iteration.
if address.raftId == pm.raftId {
// If we're a newcomer to an existing cluster, this is where we learn
// our own Address.
pm.setLocalAddress(&address)
} else {
pm.mu.RLock()
existingPeer := pm.peers[address.raftId]
pm.mu.RUnlock()
if existingPeer == nil {
glog.V(logger.Info).Infof("adding new raft peer %v", address.raftId)
pm.addPeer(&address)
}
}
}
pm.mu.Lock()
pm.confState = newConfState
pm.mu.Unlock()
glog.V(logger.Info).Infof("updated cluster membership")
}
func (pm *ProtocolManager) maybeTriggerSnapshot() {
pm.mu.RLock()
appliedIndex := pm.appliedIndex
entriesSinceLastSnap := appliedIndex - pm.snapshotIndex
pm.mu.RUnlock()
if entriesSinceLastSnap < snapshotPeriod {
return
}
pm.triggerSnapshot(appliedIndex)
}
func (pm *ProtocolManager) loadSnapshot() *raftpb.Snapshot {
if raftSnapshot := pm.readRaftSnapshot(); raftSnapshot != nil {
glog.V(logger.Info).Infof("loading snapshot")
pm.applyRaftSnapshot(*raftSnapshot)
return raftSnapshot
} else {
glog.V(logger.Info).Infof("no snapshot to load")
return nil
}
}
func (snapshot *Snapshot) toBytes() []byte {
size, r, err := rlp.EncodeToReader(snapshot)
if err != nil {
panic(fmt.Sprintf("error: failed to RLP-encode Snapshot: %s", err.Error()))
}
var buffer = make([]byte, uint32(size))
r.Read(buffer)
return buffer
}
func bytesToSnapshot(bytes []byte) *Snapshot {
var snapshot Snapshot
if err := rlp.DecodeBytes(bytes, &snapshot); err != nil {
log.Fatalf("failed to RLP-decode Snapshot: %v", err)
}
return &snapshot
}
func (snapshot *Snapshot) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, []interface{}{snapshot.addresses, snapshot.removedRaftIds, snapshot.headBlockHash})
}
func (snapshot *Snapshot) DecodeRLP(s *rlp.Stream) error {
// These fields need to be public:
var temp struct {
Addresses []Address
RemovedRaftIds []uint16
HeadBlockHash common.Hash
}
if err := s.Decode(&temp); err != nil {
return err
} else {
snapshot.addresses, snapshot.removedRaftIds, snapshot.headBlockHash = temp.Addresses, temp.RemovedRaftIds, temp.HeadBlockHash
return nil
}
}
// Raft snapshot
func (pm *ProtocolManager) saveRaftSnapshot(snap raftpb.Snapshot) error {
if err := pm.snapshotter.SaveSnap(snap); err != nil {
return err
}
@ -25,42 +248,7 @@ func (pm *ProtocolManager) saveSnapshot(snap raftpb.Snapshot) error {
return pm.wal.ReleaseLockTo(snap.Metadata.Index)
}
func (pm *ProtocolManager) maybeTriggerSnapshot() {
if pm.appliedIndex-pm.snapshotIndex < snapshotPeriod {
return
}
pm.triggerSnapshot()
}
func (pm *ProtocolManager) triggerSnapshot() {
glog.V(logger.Info).Infof("start snapshot [applied index: %d | last snapshot index: %d]", pm.appliedIndex, pm.snapshotIndex)
snapData := pm.blockchain.CurrentBlock().Hash().Bytes()
snap, err := pm.raftStorage.CreateSnapshot(pm.appliedIndex, &pm.confState, snapData)
if err != nil {
panic(err)
}
if err := pm.saveSnapshot(snap); err != nil {
panic(err)
}
// Discard all log entries prior to appliedIndex.
if err := pm.raftStorage.Compact(pm.appliedIndex); err != nil {
panic(err)
}
glog.V(logger.Info).Infof("compacted log at index %d", pm.appliedIndex)
pm.snapshotIndex = pm.appliedIndex
}
// For persisting cluster membership changes correctly, we need to trigger a
// snapshot before advancing our persisted appliedIndex in LevelDB.
//
// See handling of EntryConfChange entries in raft/handler.go for details.
func (pm *ProtocolManager) triggerSnapshotWithNextIndex(index uint64) {
pm.appliedIndex = index
pm.triggerSnapshot()
}
func (pm *ProtocolManager) loadSnapshot() *raftpb.Snapshot {
func (pm *ProtocolManager) readRaftSnapshot() *raftpb.Snapshot {
snapshot, err := pm.snapshotter.Load()
if err != nil && err != snap.ErrNoSnapshot {
glog.Fatalf("error loading snapshot: %v", err)
@ -69,14 +257,78 @@ func (pm *ProtocolManager) loadSnapshot() *raftpb.Snapshot {
return snapshot
}
func (pm *ProtocolManager) applySnapshot(snap raftpb.Snapshot) {
if err := pm.raftStorage.ApplySnapshot(snap); err != nil {
func (pm *ProtocolManager) applyRaftSnapshot(raftSnapshot raftpb.Snapshot) {
glog.V(logger.Info).Infof("applying snapshot to raft storage")
if err := pm.raftStorage.ApplySnapshot(raftSnapshot); err != nil {
glog.Fatalln("failed to apply snapshot: ", err)
}
snapshot := bytesToSnapshot(raftSnapshot.Data)
snapMeta := snap.Metadata
latestBlockHash := snapshot.headBlockHash
pm.updateClusterMembership(raftSnapshot.Metadata.ConfState, snapshot.addresses, snapshot.removedRaftIds)
preSyncHead := pm.blockchain.CurrentBlock()
glog.V(logger.Info).Infof("before sync, chain head is at block %x", preSyncHead.Hash())
if latestBlock := pm.blockchain.GetBlockByHash(latestBlockHash); latestBlock == nil {
pm.syncBlockchainUntil(latestBlockHash)
pm.logNewlyAcceptedTransactions(preSyncHead)
glog.V(logger.Info).Infof("%s: %x\n", chainExtensionMessage, pm.blockchain.CurrentBlock().Hash())
} else {
glog.V(logger.Info).Infof("blockchain is caught up; no need to synchronize")
}
snapMeta := raftSnapshot.Metadata
pm.confState = snapMeta.ConfState
pm.mu.Lock()
pm.snapshotIndex = snapMeta.Index
pm.advanceAppliedIndex(snapMeta.Index)
pm.mu.Unlock()
}
func (pm *ProtocolManager) syncBlockchainUntil(hash common.Hash) {
pm.mu.RLock()
peerMap := make(map[uint16]*Peer, len(pm.peers))
for raftId, peer := range pm.peers {
peerMap[raftId] = peer
}
pm.mu.RUnlock()
for {
for peerId, peer := range peerMap {
glog.V(logger.Info).Infof("synchronizing with peer %v up to block %x", peerId, hash)
peerId := peer.p2pNode.ID.String()
peerIdPrefix := fmt.Sprintf("%x", peer.p2pNode.ID[:8])
if err := pm.downloader.Synchronise(peerIdPrefix, hash, big.NewInt(0), downloader.BoundedFullSync); err != nil {
glog.V(logger.Warn).Infof("failed to synchronize with peer %v", peerId)
time.Sleep(500 * time.Millisecond)
} else {
return
}
}
}
}
func (pm *ProtocolManager) logNewlyAcceptedTransactions(preSyncHead *types.Block) {
newHead := pm.blockchain.CurrentBlock()
numBlocks := newHead.NumberU64() - preSyncHead.NumberU64()
blocks := make([]*types.Block, numBlocks)
currBlock := newHead
blocksSeen := 0
for currBlock.Hash() != preSyncHead.Hash() {
blocks[int(numBlocks)-(1+blocksSeen)] = currBlock
blocksSeen += 1
currBlock = pm.blockchain.GetBlockByHash(currBlock.ParentHash())
}
for _, block := range blocks {
for _, tx := range block.Transactions() {
logger.LogRaftCheckpoint(logger.TxAccepted, tx.Hash().Hex())
}
}
}

View File

@ -10,47 +10,42 @@ import (
"github.com/ethereum/go-ethereum/logger/glog"
)
func (pm *ProtocolManager) openWAL(maybeSnapshot *raftpb.Snapshot) *wal.WAL {
func (pm *ProtocolManager) openWAL(maybeRaftSnapshot *raftpb.Snapshot) *wal.WAL {
if !wal.Exist(pm.waldir) {
if err := os.Mkdir(pm.waldir, 0750); err != nil {
glog.Fatalf("cannot create waldir (%v)", err)
glog.Fatalf("cannot create waldir: %v", err)
}
wal, err := wal.Create(pm.waldir, nil)
if err != nil {
glog.Fatalf("failed to create waldir (%v)", err)
glog.Fatalf("failed to create waldir: %v", err)
}
wal.Close()
}
walsnap := walpb.Snapshot{}
if maybeSnapshot != nil {
walsnap.Index = maybeSnapshot.Metadata.Index
walsnap.Term = maybeSnapshot.Metadata.Term
}
glog.V(logger.Info).Infof("loading WAL")
glog.V(logger.Info).Infof("loading WAL at term %d and index %d", walsnap.Term, walsnap.Index)
walsnap := walpb.Snapshot{}
if maybeRaftSnapshot != nil {
walsnap.Index, walsnap.Term = maybeRaftSnapshot.Metadata.Index, maybeRaftSnapshot.Metadata.Term
}
wal, err := wal.Open(pm.waldir, walsnap)
if err != nil {
glog.Fatalf("error loading WAL (%v)", err)
glog.Fatalf("error loading WAL: %v", err)
}
return wal
}
func (pm *ProtocolManager) replayWAL() *wal.WAL {
func (pm *ProtocolManager) replayWAL(maybeRaftSnapshot *raftpb.Snapshot) *wal.WAL {
glog.V(logger.Info).Infoln("replaying WAL")
maybeSnapshot := pm.loadSnapshot()
wal := pm.openWAL(maybeSnapshot)
wal := pm.openWAL(maybeRaftSnapshot)
_, hardState, entries, err := wal.ReadAll()
if err != nil {
glog.Fatalf("failed to read WAL (%v)", err)
}
if maybeSnapshot != nil {
pm.applySnapshot(*maybeSnapshot)
glog.Fatalf("failed to read WAL: %v", err)
}
pm.raftStorage.SetHardState(hardState)