Dynamic raft membership support (#130)

This commit is contained in:
Brian Schroeder 2017-08-28 17:43:44 -04:00 committed by Joel Burget
parent 57e7e5f92f
commit a218568402
19 changed files with 1034 additions and 290 deletions

View File

@ -220,6 +220,8 @@ func RegisterRaftService(stack *node.Node, ctx *cli.Context, cfg gethConfig, eth
datadir := ctx.GlobalString(utils.DataDirFlag.Name)
joinExistingId := ctx.GlobalInt(utils.RaftJoinExistingFlag.Name)
raftPort := uint16(ctx.GlobalInt(utils.RaftPortFlag.Name))
if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
privkey := cfg.Node.NodeKey()
strId := discover.PubkeyID(&privkey.PublicKey).String()
@ -232,9 +234,16 @@ func RegisterRaftService(stack *node.Node, ctx *cli.Context, cfg gethConfig, eth
if joinExistingId > 0 {
myId = uint16(joinExistingId)
joinExisting = true
} else if len(peers) == 0 {
utils.Fatalf("Raft-based consensus requires either (1) an initial peers list (in static-nodes.json) including this enode hash (%v), or (2) the flag --raftjoinexisting RAFT_ID, where RAFT_ID has been issued by an existing cluster member calling `raft.addPeer(ENODE_ID)` with an enode ID containing this node's enode hash.", strId)
} else {
peerIds := make([]string, len(peers))
for peerIdx, peer := range peers {
if !peer.HasRaftPort() {
utils.Fatalf("raftport querystring parameter not specified in static-node enode ID: %v. please check your static-nodes.json file.", peer.String())
}
peerId := peer.ID.String()
peerIds[peerIdx] = peerId
if peerId == strId {
@ -249,7 +258,7 @@ func RegisterRaftService(stack *node.Node, ctx *cli.Context, cfg gethConfig, eth
ethereum := <-ethChan
return raft.New(ctx, ethereum.ChainConfig(), myId, joinExisting, blockTimeNanos, ethereum, peers, datadir)
return raft.New(ctx, ethereum.ChainConfig(), myId, raftPort, joinExisting, blockTimeNanos, ethereum, peers, datadir)
}); err != nil {
utils.Fatalf("Failed to register the Raft service: %v", err)
}

View File

@ -116,6 +116,7 @@ var (
utils.RaftModeFlag,
utils.RaftBlockTimeFlag,
utils.RaftJoinExistingFlag,
utils.RaftPortFlag,
utils.EmitCheckpointsFlag,
}

View File

@ -122,6 +122,15 @@ var AppHelpFlagGroups = []flagGroup{
utils.RaftJoinExistingFlag,
},
},
{
Name: "RAFT",
Flags: []cli.Flag{
utils.RaftModeFlag,
utils.RaftBlockTimeFlag,
utils.RaftJoinExistingFlag,
utils.RaftPortFlag,
},
},
{
Name: "ACCOUNT",
Flags: []cli.Flag{

View File

@ -505,6 +505,11 @@ var (
Name: "emitcheckpoints",
Usage: "If enabled, emit specially formatted logging checkpoints",
}
RaftPortFlag = cli.IntFlag{
Name: "raftport",
Usage: "The port to bind for the raft transport",
Value: 50400,
}
// Quorum
EnableNodePermissionFlag = cli.BoolFlag{

View File

@ -394,6 +394,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
if p == nil {
return errUnknownPeer
}
if d.mode == BoundedFullSync {
err := d.syncWithPeerUntil(p, hash, td)
if err == nil {
d.processFullSyncContent()
}
return err
}
return d.syncWithPeer(p, hash, td)
}
@ -1287,7 +1294,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
}
}
// Unless we're doing light chains, schedule the headers for associated content retrieval
if d.mode == FullSync || d.mode == FastSync {
if d.mode == FullSync || d.mode == FastSync || d.mode == BoundedFullSync {
// If we've reached the allowed number of pending headers, stall a bit
for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
select {
@ -1392,9 +1399,9 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
stateSync.Cancel()
if err := d.commitPivotBlock(P); err != nil {
return err
}
}
if err := d.importBlockResults(afterP); err != nil {
return err
}
@ -1584,3 +1591,211 @@ func (d *Downloader) requestTTL() time.Duration {
}
return ttl
}
// Extra downloader functionality for non-proof-of-work consensus
// Synchronizes with a peer, but only up to the provided Hash
func (d *Downloader) syncWithPeerUntil(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
d.mux.Post(StartEvent{})
defer func() {
// reset on error
if err != nil {
d.mux.Post(FailedEvent{err})
} else {
d.mux.Post(DoneEvent{})
}
}()
if p.version < 62 {
return errTooOld
}
log.Info("Synchronising with the network", "id", p.id, "version", p.version)
defer func(start time.Time) {
log.Info("Synchronisation terminated", "duration", time.Since(start))
}(time.Now())
remoteHeader, err := d.fetchHeader(p, hash)
if err != nil {
return err
}
remoteHeight := remoteHeader.Number.Uint64()
localHeight := d.blockchain.CurrentBlock().NumberU64()
d.syncStatsLock.Lock()
if d.syncStatsChainHeight <= localHeight || d.syncStatsChainOrigin > localHeight {
d.syncStatsChainOrigin = localHeight
}
d.syncStatsChainHeight = remoteHeight
d.syncStatsLock.Unlock()
d.queue.Prepare(localHeight+1, d.mode, uint64(0), remoteHeader)
if d.syncInitHook != nil {
d.syncInitHook(localHeight, remoteHeight)
}
fetchers := []func() error{
func() error { return d.fetchBoundedHeaders(p, localHeight+1, remoteHeight) },
func() error { return d.fetchBodies(localHeight + 1) },
func() error { return d.fetchReceipts(localHeight + 1) }, // Receipts are only retrieved during fast sync
func() error { return d.processHeaders(localHeight+1, td) },
}
return d.spawnSync(fetchers)
}
// Fetches a single header from a peer
func (d *Downloader) fetchHeader(p *peerConnection, hash common.Hash) (*types.Header, error) {
log.Info("retrieving remote chain height", "peer", p)
go p.peer.RequestHeadersByHash(hash, 1, 0, false)
timeout := time.After(d.requestTTL())
for {
select {
case <-d.cancelCh:
return nil, errCancelBlockFetch
case packet := <-d.headerCh:
// Discard anything not from the origin peer
if packet.PeerId() != p.id {
log.Info("Received headers from incorrect peer", "peer id", packet.PeerId())
break
}
// Make sure the peer actually gave something valid
headers := packet.(*headerPack).headers
if len(headers) != 1 {
log.Info("invalid number of head headers (!= 1)", "peer", p, "len(headers)", len(headers))
return nil, errBadPeer
}
return headers[0], nil
case <-timeout:
log.Info("head header timeout", "peer", p)
return nil, errTimeout
case <-d.bodyCh:
case <-d.stateCh:
case <-d.receiptCh:
// Out of bounds delivery, ignore
}
}
}
// Not defined in go's stdlib:
func minInt(a, b int) int {
if a < b {
return a
}
return b
}
// Fetches headers between `from` and `to`, inclusive.
// Assumes invariant: from <= to.
func (d *Downloader) fetchBoundedHeaders(p *peerConnection, from uint64, to uint64) error {
log.Info("directing header downloads", "peer", p, "from", from, "to", to)
defer log.Info("header download terminated", "peer", p)
// Create a timeout timer, and the associated header fetcher
skeleton := true // Skeleton assembly phase or finishing up
request := time.Now() // time of the last skeleton fetch request
timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
<-timeout.C // timeout channel should be initially empty
defer timeout.Stop()
getHeaders := func(from uint64) {
request = time.Now()
timeout.Reset(d.requestTTL())
skeletonStart := from + uint64(MaxHeaderFetch) - 1
if skeleton {
if skeletonStart > to {
skeleton = false
}
}
if skeleton {
numSkeletonHeaders := minInt(MaxSkeletonSize, (int(to-from)+1)/MaxHeaderFetch)
log.Trace("fetching skeleton headers", "peer", p, "num skeleton headers", numSkeletonHeaders, "from", from)
go p.peer.RequestHeadersByNumber(skeletonStart, numSkeletonHeaders, MaxHeaderFetch-1, false)
} else {
// There are not enough headers remaining to warrant a skeleton fetch.
// Grab all of the remaining headers.
numHeaders := int(to-from) + 1
log.Trace("fetching full headers", "peer", p, "num headers", numHeaders, "from", from)
go p.peer.RequestHeadersByNumber(from, numHeaders, 0, false)
}
}
// Start pulling the header chain skeleton until all is done
getHeaders(from)
for {
select {
case <-d.cancelCh:
return errCancelHeaderFetch
case packet := <-d.headerCh:
// Make sure the active peer is giving us the skeleton headers
if packet.PeerId() != p.id {
log.Info("Received headers from incorrect peer", "peer id", packet.PeerId())
break
}
headerReqTimer.UpdateSince(request)
timeout.Stop()
headers := packet.(*headerPack).headers
// If we received a skeleton batch, resolve internals concurrently
if skeleton {
filled, proced, err := d.fillHeaderSkeleton(from, headers)
if err != nil {
log.Debug("skeleton chain invalid", "peer", p, "err", err)
return errInvalidChain
}
headers = filled[proced:]
from += uint64(proced)
}
// Insert all the new headers and fetch the next batch
if len(headers) > 0 {
log.Trace("schedule headers", "peer", p, "num headers", len(headers), "from", from)
select {
case d.headerProcCh <- headers:
case <-d.cancelCh:
return errCancelHeaderFetch
}
from += uint64(len(headers))
}
if from <= to {
getHeaders(from)
} else {
// Notify the content fetchers that no more headers are inbound and return.
select {
case d.headerProcCh <- nil:
return nil
case <-d.cancelCh:
return errCancelHeaderFetch
}
}
case <-timeout.C:
// Header retrieval timed out, consider the peer bad and drop
log.Info("header request timed out", "peer", p)
headerTimeoutMeter.Mark(1)
d.dropPeer(p.id)
// Finish the sync gracefully instead of dumping the gathered data though
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
}
}
select {
case d.headerProcCh <- nil:
case <-d.cancelCh:
}
return errBadPeer
}
}
}

View File

@ -25,6 +25,8 @@ const (
FullSync SyncMode = iota // Synchronise the entire blockchain history from full blocks
FastSync // Quickly download the headers, full sync only at the chain head
LightSync // Download only the headers and terminate afterwards
// Used by raft:
BoundedFullSync SyncMode = 100 // Perform a full sync until the requested hash, and no further
)
func (mode SyncMode) IsValid() bool {

View File

@ -332,6 +332,17 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
defer msg.Discard()
if pm.raftMode {
if msg.Code != TxMsg &&
msg.Code != GetBlockHeadersMsg && msg.Code != BlockHeadersMsg &&
msg.Code != GetBlockBodiesMsg && msg.Code != BlockBodiesMsg {
log.Info("raft: ignoring message", "code", msg.Code)
return nil
}
}
// Handle the message depending on its contents
switch {
case msg.Code == StatusMsg:
@ -723,7 +734,11 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) {
// Broadcast transaction to a batch of peers not knowing about it
peers := pm.peers.PeersWithoutTx(hash)
//FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
// NOTE: Raft-based consensus currently assumes that geth broadcasts
// transactions to all peers in the network. A previous comment here
// indicated that this logic might change in the future to only send to a
// subset of peers. If this change occurs upstream, a merge conflict should
// arise here, and we should add logic to send to *all* peers in raft mode.
for _, peer := range peers {
peer.SendTransactions(types.Transactions{tx})
}

View File

@ -148,11 +148,15 @@ func (pm *ProtocolManager) syncer() {
if pm.peers.Len() < minDesiredPeerCount {
break
}
go pm.synchronise(pm.peers.BestPeer())
if !pm.raftMode {
go pm.synchronise(pm.peers.BestPeer())
}
case <-forceSync.C:
// Force a sync even if not enough peers are present
go pm.synchronise(pm.peers.BestPeer())
if !pm.raftMode {
// Force a sync even if not enough peers are present
go pm.synchronise(pm.peers.BestPeer())
}
case <-pm.noMorePeers:
return

View File

@ -713,6 +713,16 @@ web3._extend({
new web3._extend.Property({
name: 'role',
getter: 'raft_role'
}),
new web3._extend.Method({
name: 'addPeer',
call: 'raft_addPeer',
params: 1
}),
new web3._extend.Method({
name: 'removePeer',
call: 'raft_removePeer',
params: 1
})
]
})

View File

@ -44,6 +44,8 @@ type Node struct {
UDP, TCP uint16 // port numbers
ID NodeID // the node's public key
RaftPort uint16
// This is a cached copy of sha3(ID) which is used for node
// distance calculations. This is part of Node in order to make it
// possible to write tests that need a node at a certain distance.
@ -111,10 +113,23 @@ func (n *Node) String() string {
if n.UDP != n.TCP {
u.RawQuery = "discport=" + strconv.Itoa(int(n.UDP))
}
if n.HasRaftPort() {
raftQuery := "raftport=" + strconv.Itoa(int(n.RaftPort))
if len(u.RawQuery) > 0 {
u.RawQuery = u.RawQuery + "&" + raftQuery
} else {
u.RawQuery = raftQuery
}
}
}
return u.String()
}
func (n *Node) HasRaftPort() bool {
return n.RaftPort > 0
}
var incompleteNodeURL = regexp.MustCompile("(?i)^(?:enode://)?([0-9a-f]+)$")
// ParseNode parses a node designator.
@ -195,7 +210,17 @@ func parseComplete(rawurl string) (*Node, error) {
return nil, errors.New("invalid discport in query")
}
}
return NewNode(id, ip, uint16(udpPort), uint16(tcpPort)), nil
node := NewNode(id, ip, uint16(udpPort), uint16(tcpPort))
if qv.Get("raftport") != "" {
raftPort, err := strconv.ParseUint(qv.Get("raftport"), 10, 16)
if err != nil {
return nil, errors.New("invalid raftport in query")
}
node.RaftPort = uint16(raftPort)
}
return node, nil
}
// MustParseNode parses a node URL. It panics if the URL is not valid.

View File

@ -1,5 +1,15 @@
package raft
type RaftNodeInfo struct {
ClusterSize int `json:"clusterSize"`
Role string `json:"role"`
Address *Address `json:"address"`
PeerAddresses []*Address `json:"peerAddresses"`
RemovedPeerIds []uint16 `json:"removedPeerIds"`
AppliedIndex uint64 `json:"appliedIndex"`
SnapshotIndex uint64 `json:"snapshotIndex"`
}
type PublicRaftAPI struct {
raftService *RaftService
}
@ -12,8 +22,8 @@ func (s *PublicRaftAPI) Role() string {
return s.raftService.raftProtocolManager.NodeInfo().Role
}
func (s *PublicRaftAPI) AddPeer(raftId uint16, enodeId string) error {
return s.raftService.raftProtocolManager.ProposeNewPeer(raftId, enodeId)
func (s *PublicRaftAPI) AddPeer(enodeId string) (uint16, error) {
return s.raftService.raftProtocolManager.ProposeNewPeer(enodeId)
}
func (s *PublicRaftAPI) RemovePeer(raftId uint16) {

View File

@ -5,9 +5,9 @@ import (
"time"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
@ -24,6 +24,7 @@ type RaftService struct {
txMu sync.Mutex
txPool *core.TxPool
accountManager *accounts.Manager
downloader *downloader.Downloader
raftProtocolManager *ProtocolManager
startPeers []*discover.Node
@ -33,27 +34,21 @@ type RaftService struct {
minter *minter
}
type RaftNodeInfo struct {
ClusterSize int `json:"clusterSize"`
Genesis common.Hash `json:"genesis"` // SHA3 hash of the host's genesis block
Head common.Hash `json:"head"` // SHA3 hash of the host's best owned block
Role string `json:"role"`
}
func New(ctx *node.ServiceContext, chainConfig *params.ChainConfig, raftId uint16, joinExisting bool, blockTime time.Duration, e *eth.Ethereum, startPeers []*discover.Node, datadir string) (*RaftService, error) {
func New(ctx *node.ServiceContext, chainConfig *params.ChainConfig, raftId, raftPort uint16, joinExisting bool, blockTime time.Duration, e *eth.Ethereum, startPeers []*discover.Node, datadir string) (*RaftService, error) {
service := &RaftService{
eventMux: ctx.EventMux,
chainDb: e.ChainDb(),
blockchain: e.BlockChain(),
txPool: e.TxPool(),
accountManager: e.AccountManager(),
downloader: e.Downloader(),
startPeers: startPeers,
}
service.minter = newMinter(chainConfig, service, blockTime)
var err error
if service.raftProtocolManager, err = NewProtocolManager(raftId, service.blockchain, service.eventMux, startPeers, joinExisting, datadir, service.minter); err != nil {
if service.raftProtocolManager, err = NewProtocolManager(raftId, raftPort, service.blockchain, service.eventMux, startPeers, joinExisting, datadir, service.minter, service.downloader); err != nil {
return nil, err
}
@ -95,6 +90,7 @@ func (service *RaftService) Start(p2pServer *p2p.Server) error {
func (service *RaftService) Stop() error {
service.blockchain.Stop()
service.raftProtocolManager.Stop()
service.minter.stop()
service.eventMux.Stop()
service.chainDb.Close()

View File

@ -26,6 +26,8 @@ const (
snapshotPeriod = 250
peerUrlKeyPrefix = "peerUrl-"
chainExtensionMessage = "Successfully extended chain"
)
var (

View File

@ -6,8 +6,6 @@ This directory holds an implementation of a [Raft](https://raft.github.io)-based
When the `geth` binary is passed the `--raft` flag, the node will operate in "raft mode."
Currently Raft-based consensus requires that all nodes in the cluster are configured to list the others up-front as [static peers](https://github.com/ethereum/go-ethereum/wiki/Connecting-to-the-network#static-nodes). We will be adding support for dynamic membership changes in the near future.
## Some implementation basics
Note: Though we use the etcd implementation of the Raft protocol, we speak of "Raft" more broadly to refer to the Raft protocol, and its use to achieve consensus for Quorum/Ethereum.
@ -146,6 +144,16 @@ There is currently no limit to the length of these speculative chains, but we pl
We communicate blocks over the HTTP transport layer built in to etcd Raft. It's also (at least theoretically) possible to use p2p protocol built-in to Ethereum as a transport for Raft. In our testing we found the default etcd HTTP transport to be more reliable than the p2p (at least as implemented in geth) under high load.
Quorum listens on port 50400 by default for the raft transport, but this is configurable with the `--raftport` flag.
## Initial configuration, and enacting membership changes
Currently Raft-based consensus requires that all _initial_ nodes in the cluster are configured to list the others up-front as [static peers](https://github.com/ethereum/go-ethereum/wiki/Connecting-to-the-network#static-nodes). These enode ID URIs _must_ include a `raftport` querystring parameter specifying the raft port for each peer: e.g. `enode://abcd@127.0.0.1:30400?raftport=50400`.
To remove a node from the cluster, attach to a JS console and issue `raft.removePeer(raftId)`, where `raftId` is the number of the node you wish to remove. For initial nodes in the cluster, this number is the 1-indexed position of the node's enode ID in the static peers list. Once a node has been removed from the cluster, it is permanent; this raft ID can not ever re-connect to the cluster in the future, and the party must re-join the cluster with a new raft ID.
To add a node to the cluster, attach to a JS console and issue `raft.addPeer(enodeId)`. Note that like the enode IDs listed in the static peers JSON file, this enode ID should include a `raftport` querystring parameter. This call will allocate and return a raft ID that was not already in use. After `addPeer`, start the new geth node with the flag `--raftjoinexisting RAFTID` in addition to `--raft`.
## FAQ
### Could you have a single- or two-node cluster? More generally, could you have an even number of nodes?

View File

@ -2,6 +2,15 @@ package raft
import (
"fmt"
"net/http"
"net/url"
"os"
"strconv"
"sync"
"time"
"golang.org/x/net/context"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/wal"
@ -13,15 +22,6 @@ import (
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/rlp"
"golang.org/x/net/context"
"math/big"
"net/http"
"net/url"
"os"
"strconv"
"sync"
"syscall"
"time"
"github.com/coreos/etcd/etcdserver/stats"
raftTypes "github.com/coreos/etcd/pkg/types"
@ -29,81 +29,71 @@ import (
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/rafthttp"
"github.com/syndtr/goleveldb/leveldb"
"gopkg.in/fatih/set.v0"
)
// Overview of the channels used in this module:
//
// Node.
// * quitSync: *Every* channel operation can be unblocked by closing this
// channel.
//
// ProtocolManager.
// * proposeC, for proposals flowing from ethereum to raft
// * confChangeC, currently unused; in the future for adding new, non-initial, raft peers
// * roleC, coming from raft notifies us when our role changes
type ProtocolManager struct {
raftId uint16 // This node's raft id
joinExisting bool // Whether to join an existing cluster when a WAL doesn't already exist
mu sync.RWMutex // For protecting concurrent JS access to "local peer" and "remote peer" state
quitSync chan struct{}
stopped bool
// Static configuration
joinExisting bool // Whether to join an existing cluster when a WAL doesn't already exist
bootstrapNodes []*discover.Node
peers map[uint16]*Peer
p2pServer *p2p.Server // Initialized in start()
raftId uint16
raftPort uint16
// Local peer state (protected by mu vs concurrent access via JS)
address *Address
role int // Role: minter or verifier
appliedIndex uint64 // The index of the last-applied raft entry
snapshotIndex uint64 // The index of the latest snapshot.
// Remote peer state (protected by mu vs concurrent access via JS)
peers map[uint16]*Peer
removedPeers *set.Set // *Permanently removed* peers
// P2P transport
p2pServer *p2p.Server // Initialized in start()
// Blockchain services
blockchain *core.BlockChain
// to protect the raft peers and addresses
mu sync.RWMutex
eventMux *event.TypeMux
minedBlockSub *event.TypeMuxSubscription
minedBlockChan <-chan struct{}
downloader *downloader.Downloader
peerGetter func() (string, *big.Int)
minter *minter
rawNode etcdRaft.Node
raftStorage *etcdRaft.MemoryStorage
// Blockchain events
eventMux *event.TypeMux
minedBlockSub *event.TypeMuxSubscription
transport *rafthttp.Transport
httpstopc chan struct{}
httpdonec chan struct{}
// Raft proposal events
blockProposalC chan *types.Block // for mined blocks to raft
confChangeProposalC chan raftpb.ConfChange // for config changes from js console to raft
// The number of entries applied to the raft log
appliedIndex uint64
// Raft transport
unsafeRawNode etcdRaft.Node
transport *rafthttp.Transport
httpstopc chan struct{}
httpdonec chan struct{}
// The index of the latest snapshot.
snapshotIndex uint64
// Snapshotting
// Raft snapshotting
snapshotter *snap.Snapshotter
snapdir string
confState raftpb.ConfState
// write-ahead log
// Raft write-ahead log
waldir string
wal *wal.WAL
// Persistence outside of the blockchain and raft log to keep track of our
// last-applied raft index and raft peer URLs.
quorumRaftDb *leveldb.DB
blockProposalC chan *types.Block
confChangeProposalC chan raftpb.ConfChange
quitSync chan struct{}
// Note: we don't actually use this field. We just set it at the same time as
// starting or stopping the miner in notifyRoleChange. We might want to remove
// it, but it might also be useful to check.
role int
minter *minter
// Storage
quorumRaftDb *leveldb.DB // Persistent storage for last-applied raft index
raftStorage *etcdRaft.MemoryStorage // Volatile raft storage
}
//
// Public interface
//
func NewProtocolManager(raftId uint16, blockchain *core.BlockChain, mux *event.TypeMux, bootstrapNodes []*discover.Node, joinExisting bool, datadir string, minter *minter) (*ProtocolManager, error) {
func NewProtocolManager(raftId uint16, raftPort uint16, blockchain *core.BlockChain, mux *event.TypeMux, bootstrapNodes []*discover.Node, joinExisting bool, datadir string, minter *minter, downloader *downloader.Downloader) (*ProtocolManager, error) {
waldir := fmt.Sprintf("%s/raft-wal", datadir)
snapdir := fmt.Sprintf("%s/raft-snap", datadir)
quorumRaftDbLoc := fmt.Sprintf("%s/quorum-raft-state", datadir)
@ -111,6 +101,7 @@ func NewProtocolManager(raftId uint16, blockchain *core.BlockChain, mux *event.T
manager := &ProtocolManager{
bootstrapNodes: bootstrapNodes,
peers: make(map[uint16]*Peer),
removedPeers: set.New(),
joinExisting: joinExisting,
blockchain: blockchain,
eventMux: mux,
@ -122,9 +113,11 @@ func NewProtocolManager(raftId uint16, blockchain *core.BlockChain, mux *event.T
snapdir: snapdir,
snapshotter: snap.New(snapdir),
raftId: raftId,
raftPort: raftPort,
quitSync: make(chan struct{}),
raftStorage: etcdRaft.NewMemoryStorage(),
minter: minter,
downloader: downloader,
}
if db, err := openQuorumRaftDb(quorumRaftDbLoc); err != nil {
@ -141,34 +134,51 @@ func (pm *ProtocolManager) Start(p2pServer *p2p.Server) {
pm.p2pServer = p2pServer
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop(pm.blockProposalC)
pm.startRaft()
go pm.minedBroadcastLoop()
}
func (pm *ProtocolManager) Stop() {
pm.mu.Lock()
defer pm.mu.Unlock()
defer log.Info("raft protocol handler stopped")
if pm.stopped {
return
}
log.Info("stopping raft protocol handler...")
for raftId, peer := range pm.peers {
pm.disconnectFromPeer(raftId, peer)
}
pm.minedBlockSub.Unsubscribe()
pm.transport.Stop()
if pm.transport != nil {
pm.transport.Stop()
}
close(pm.httpstopc)
<-pm.httpdonec
close(pm.quitSync)
if pm.rawNode != nil {
pm.rawNode.Stop()
if pm.unsafeRawNode != nil {
pm.unsafeRawNode.Stop()
}
pm.quorumRaftDb.Close()
pm.minter.stop()
pm.p2pServer = nil
log.Info("raft protocol handler stopped")
pm.minter.stop()
pm.stopped = true
}
func (pm *ProtocolManager) NodeInfo() *RaftNodeInfo {
pm.mu.RLock() // as we read pm.role
pm.mu.RLock() // as we read role and peers
defer pm.mu.RUnlock()
var roleDescription string
@ -178,39 +188,130 @@ func (pm *ProtocolManager) NodeInfo() *RaftNodeInfo {
roleDescription = "verifier"
}
peerAddresses := make([]*Address, len(pm.peers))
peerIdx := 0
for _, peer := range pm.peers {
peerAddresses[peerIdx] = peer.address
peerIdx += 1
}
removedPeerIfaces := pm.removedPeers.List()
removedPeerIds := make([]uint16, len(removedPeerIfaces))
for i, removedIface := range removedPeerIfaces {
removedPeerIds[i] = removedIface.(uint16)
}
//
// NOTE: before exposing any new fields here, make sure that the underlying
// ProtocolManager members are protected from concurrent access by pm.mu!
//
return &RaftNodeInfo{
ClusterSize: len(pm.peers) + 1,
Genesis: pm.blockchain.Genesis().Hash(),
Head: pm.blockchain.CurrentBlock().Hash(),
Role: roleDescription,
ClusterSize: len(pm.peers) + 1,
Role: roleDescription,
Address: pm.address,
PeerAddresses: peerAddresses,
RemovedPeerIds: removedPeerIds,
AppliedIndex: pm.appliedIndex,
SnapshotIndex: pm.snapshotIndex,
}
}
func (pm *ProtocolManager) ProposeNewPeer(raftId uint16, enodeId string) error {
// There seems to be a very rare race in raft where during `etcdRaft.StartNode`
// it will call back our `Process` method before it's finished returning the
// `raft.Node`, `pm.unsafeRawNode`, to us. This re-entrance through a separate
// thread will cause a nil pointer dereference. To work around this, this
// getter method should be used instead of reading `pm.unsafeRawNode` directly.
func (pm *ProtocolManager) rawNode() etcdRaft.Node {
for pm.unsafeRawNode == nil {
time.Sleep(100 * time.Millisecond)
}
return pm.unsafeRawNode
}
func (pm *ProtocolManager) nextRaftId() uint16 {
pm.mu.RLock()
defer pm.mu.RUnlock()
maxId := pm.raftId
for peerId := range pm.peers {
if maxId < peerId {
maxId = peerId
}
}
removedPeerIfaces := pm.removedPeers.List()
for _, removedIface := range removedPeerIfaces {
removedId := removedIface.(uint16)
if maxId < removedId {
maxId = removedId
}
}
return maxId + 1
}
func (pm *ProtocolManager) isRaftIdRemoved(id uint16) bool {
pm.mu.RLock()
defer pm.mu.RUnlock()
return pm.removedPeers.Has(id)
}
func (pm *ProtocolManager) isRaftIdUsed(raftId uint16) bool {
if pm.raftId == raftId || pm.isRaftIdRemoved(raftId) {
return true
}
pm.mu.RLock()
defer pm.mu.RUnlock()
return pm.peers[raftId] != nil
}
func (pm *ProtocolManager) isP2pNodeInCluster(node *discover.Node) bool {
pm.mu.RLock()
defer pm.mu.RUnlock()
for _, peer := range pm.peers {
if peer.p2pNode.ID == node.ID {
return true
}
}
return false
}
func (pm *ProtocolManager) ProposeNewPeer(enodeId string) (uint16, error) {
node, err := discover.ParseNode(enodeId)
if err != nil {
return err
return 0, err
}
if pm.isP2pNodeInCluster(node) {
return 0, fmt.Errorf("node is already in the cluster: %v", enodeId)
}
if len(node.IP) != 4 {
return fmt.Errorf("expected IPv4 address (with length 4), but got IP of length %v", len(node.IP))
return 0, fmt.Errorf("expected IPv4 address (with length 4), but got IP of length %v", len(node.IP))
}
address := &Address{
raftId: raftId,
nodeId: node.ID,
ip: node.IP,
p2pPort: node.TCP,
raftPort: raftPort(raftId),
if !node.HasRaftPort() {
return 0, fmt.Errorf("enodeId is missing raftport querystring parameter: %v", enodeId)
}
raftId := pm.nextRaftId()
address := newAddress(raftId, node.RaftPort, node)
pm.confChangeProposalC <- raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: uint64(raftId),
Context: address.toBytes(),
}
return nil
return raftId, nil
}
func (pm *ProtocolManager) ProposePeerRemoval(raftId uint16) {
@ -229,7 +330,7 @@ func (pm *ProtocolManager) WriteMsg(msg p2p.Msg) error {
var buffer = make([]byte, msg.Size)
msg.Payload.Read(buffer)
return pm.rawNode.Propose(context.TODO(), buffer)
return pm.rawNode().Propose(context.TODO(), buffer)
}
//
@ -237,17 +338,17 @@ func (pm *ProtocolManager) WriteMsg(msg p2p.Msg) error {
//
func (pm *ProtocolManager) Process(ctx context.Context, m raftpb.Message) error {
return pm.rawNode.Step(ctx, m)
return pm.rawNode().Step(ctx, m)
}
func (pm *ProtocolManager) IsIDRemoved(id uint64) bool {
return pm.peers[uint16(id)] == nil
return pm.isRaftIdRemoved(uint16(id))
}
func (pm *ProtocolManager) ReportUnreachable(id uint64) {
log.Info("peer is currently unreachable", "peer", id)
log.Info("peer is currently unreachable", "peer id", id)
pm.rawNode.ReportUnreachable(id)
pm.rawNode().ReportUnreachable(id)
}
func (pm *ProtocolManager) ReportSnapshot(id uint64, status etcdRaft.SnapshotStatus) {
@ -257,7 +358,7 @@ func (pm *ProtocolManager) ReportSnapshot(id uint64, status etcdRaft.SnapshotSta
log.Info("finished sending snapshot", "raft peer", id)
}
pm.rawNode.ReportSnapshot(id, status)
pm.rawNode().ReportSnapshot(id, status)
}
//
@ -270,18 +371,51 @@ func (pm *ProtocolManager) startRaft() {
fatalf("cannot create dir for snapshot (%v)", err)
}
}
walExisted := wal.Exist(pm.waldir)
lastAppliedIndex := pm.loadAppliedIndex()
pm.wal = pm.replayWAL()
ss := &stats.ServerStats{}
ss.Initialize()
pm.transport = &rafthttp.Transport{
ID: raftTypes.ID(pm.raftId),
ClusterID: 0x1000,
Raft: pm,
ServerStats: ss,
LeaderStats: stats.NewLeaderStats(strconv.Itoa(int(pm.raftId))),
ErrorC: make(chan error),
}
pm.transport.Start()
// We load the snapshot to connect to prev peers before replaying the WAL,
// which typically goes further into the future than the snapshot.
var maybeRaftSnapshot *raftpb.Snapshot
if walExisted {
maybeRaftSnapshot = pm.loadSnapshot() // re-establishes peer connections
}
pm.wal = pm.replayWAL(maybeRaftSnapshot)
if walExisted {
if hardState, _, err := pm.raftStorage.InitialState(); err != nil {
panic(fmt.Sprintf("failed to read initial state from raft while restarting: %v", err))
} else {
if lastPersistedCommittedIndex := hardState.Commit; lastPersistedCommittedIndex < lastAppliedIndex {
log.Info("rolling back applied index to last-durably-committed", "last applied index", lastAppliedIndex, "last persisted index", lastPersistedCommittedIndex)
// Roll back our applied index. See the logic and explanation around
// the single call to `pm.applyNewChainHead` for more context.
lastAppliedIndex = lastPersistedCommittedIndex
}
}
}
// NOTE: cockroach sets this to false for now until they've "worked out the
// bugs"
enablePreVote := true
lastAppliedIndex := pm.loadAppliedIndex()
c := &etcdRaft.Config{
raftConfig := &etcdRaft.Config{
Applied: lastAppliedIndex,
ID: uint64(pm.raftId),
ElectionTick: 10, // NOTE: cockroach sets this to 15
@ -313,50 +447,59 @@ func (pm *ProtocolManager) startRaft() {
MaxInflightMsgs: 256, // NOTE: in cockroachdb this is 4
}
log.Info("local raft ID", "ID", c.ID)
ss := &stats.ServerStats{}
ss.Initialize()
pm.transport = &rafthttp.Transport{
ID: raftTypes.ID(pm.raftId),
ClusterID: 0x1000,
Raft: pm,
ServerStats: ss,
LeaderStats: stats.NewLeaderStats(strconv.Itoa(int(pm.raftId))),
ErrorC: make(chan error),
}
pm.transport.Start()
log.Info("startRaft", "raft ID", raftConfig.ID)
if walExisted {
log.Info("remounting an existing raft log; connecting to peers.")
pm.reconnectToPreviousPeers()
pm.rawNode = etcdRaft.RestartNode(c)
pm.unsafeRawNode = etcdRaft.RestartNode(raftConfig)
} else if pm.joinExisting {
log.Info("newly joining an existing cluster; waiting for connections.")
pm.rawNode = etcdRaft.StartNode(c, nil)
pm.unsafeRawNode = etcdRaft.StartNode(raftConfig, nil)
} else {
if numPeers := len(pm.bootstrapNodes); numPeers == 0 {
panic("exiting due to empty raft peers list")
} else {
log.Info("starting a new raft log with an initial cluster.", "size", numPeers)
log.Info("starting a new raft log", "initial cluster size of", numPeers)
}
peers := makeInitialRaftPeers(pm.bootstrapNodes)
pm.rawNode = etcdRaft.StartNode(c, peers)
raftPeers, peerAddresses, localAddress := pm.makeInitialRaftPeers()
pm.setLocalAddress(localAddress)
// We add all peers up-front even though we will see a ConfChangeAddNode
// for each shortly. This is because raft's ConfState will contain all of
// these nodes before we see these log entries, and we always want our
// snapshots to have all addresses for each of the nodes in the ConfState.
for _, peerAddress := range peerAddresses {
pm.addPeer(peerAddress)
}
pm.unsafeRawNode = etcdRaft.StartNode(raftConfig, raftPeers)
}
go pm.serveRaft()
go pm.serveLocalProposals()
go pm.eventLoop()
go pm.handleRoleChange(pm.rawNode.RoleChan().Out())
go pm.handleRoleChange(pm.rawNode().RoleChan().Out())
}
func (pm *ProtocolManager) setLocalAddress(addr *Address) {
pm.mu.Lock()
pm.address = addr
pm.mu.Unlock()
// By setting `URLs` on the raft transport, we advertise our URL (in an HTTP
// header) to any recipient. This is necessary for a newcomer to the cluster
// to be able to accept a snapshot from us to bootstrap them.
if urls, err := raftTypes.NewURLs([]string{raftUrl(addr)}); err == nil {
pm.transport.URLs = urls
} else {
panic(fmt.Sprintf("error: could not create URL from local address: %v", addr))
}
}
func (pm *ProtocolManager) serveRaft() {
urlString := fmt.Sprintf("http://0.0.0.0:%d", raftPort(pm.raftId))
urlString := fmt.Sprintf("http://0.0.0.0:%d", pm.raftPort)
url, err := url.Parse(urlString)
if err != nil {
fatalf("Failed parsing URL (%v)", err)
@ -403,12 +546,12 @@ func (pm *ProtocolManager) handleRoleChange(roleC <-chan interface{}) {
}
}
func (pm *ProtocolManager) minedBroadcastLoop(proposeC chan<- *types.Block) {
func (pm *ProtocolManager) minedBroadcastLoop() {
for obj := range pm.minedBlockSub.Chan() {
switch ev := obj.Data.(type) {
case core.NewMinedBlockEvent:
select {
case proposeC <- ev.Block:
case pm.blockProposalC <- ev.Block:
case <-pm.quitSync:
return
}
@ -439,7 +582,7 @@ func (pm *ProtocolManager) serveLocalProposals() {
r.Read(buffer)
// blocks until accepted by the raft state machine
pm.rawNode.Propose(context.TODO(), buffer)
pm.rawNode().Propose(context.TODO(), buffer)
case cc, ok := <-pm.confChangeProposalC:
if !ok {
log.Info("error: read from confChangeC failed")
@ -448,20 +591,22 @@ func (pm *ProtocolManager) serveLocalProposals() {
confChangeCount++
cc.ID = confChangeCount
pm.rawNode.ProposeConfChange(context.TODO(), cc)
pm.rawNode().ProposeConfChange(context.TODO(), cc)
case <-pm.quitSync:
return
}
}
}
func (pm *ProtocolManager) entriesToApply(ents []raftpb.Entry) (nents []raftpb.Entry) {
if len(ents) == 0 {
func (pm *ProtocolManager) entriesToApply(allEntries []raftpb.Entry) (entriesToApply []raftpb.Entry) {
if len(allEntries) == 0 {
return
}
first := ents[0].Index
first := allEntries[0].Index
pm.mu.RLock()
lastApplied := pm.appliedIndex
pm.mu.RUnlock()
if first > lastApplied+1 {
fatalf("first index of committed entry[%d] should <= appliedIndex[%d] + 1", first, lastApplied)
@ -469,13 +614,20 @@ func (pm *ProtocolManager) entriesToApply(ents []raftpb.Entry) (nents []raftpb.E
firstToApply := lastApplied - first + 1
if firstToApply < uint64(len(ents)) {
nents = ents[firstToApply:]
if firstToApply < uint64(len(allEntries)) {
entriesToApply = allEntries[firstToApply:]
}
return
}
func raftUrl(address *Address) string {
return fmt.Sprintf("http://%s:%d", address.ip, address.raftPort)
}
func (pm *ProtocolManager) addPeer(address *Address) {
pm.mu.Lock()
defer pm.mu.Unlock()
raftId := address.raftId
// Add P2P connection:
@ -483,12 +635,13 @@ func (pm *ProtocolManager) addPeer(address *Address) {
pm.p2pServer.AddPeer(p2pNode)
// Add raft transport connection:
peerUrl := fmt.Sprintf("http://%s:%d", address.ip, raftPort(raftId))
pm.transport.AddPeer(raftTypes.ID(raftId), []string{peerUrl})
pm.mu.Lock()
pm.transport.AddPeer(raftTypes.ID(raftId), []string{raftUrl(address)})
pm.peers[raftId] = &Peer{address, p2pNode}
pm.mu.Unlock()
}
func (pm *ProtocolManager) disconnectFromPeer(raftId uint16, peer *Peer) {
pm.p2pServer.RemovePeer(peer.p2pNode)
pm.transport.RemovePeer(raftTypes.ID(raftId))
}
func (pm *ProtocolManager) removePeer(raftId uint16) {
@ -496,22 +649,18 @@ func (pm *ProtocolManager) removePeer(raftId uint16) {
defer pm.mu.Unlock()
if peer := pm.peers[raftId]; peer != nil {
pm.p2pServer.RemovePeer(peer.p2pNode)
pm.transport.RemovePeer(raftTypes.ID(raftId))
pm.disconnectFromPeer(raftId, peer)
delete(pm.peers, raftId)
}
}
func (pm *ProtocolManager) reconnectToPreviousPeers() {
_, confState, _ := pm.raftStorage.InitialState()
for _, nodeRaftId := range confState.Nodes {
if nodeRaftId := uint16(nodeRaftId); nodeRaftId != pm.raftId {
address := pm.loadPeerAddress(nodeRaftId)
pm.addPeer(address)
}
}
// This is only necessary sometimes, but it's idempotent. Also, we *always*
// do this, and not just when there's still a peer in the map, because we
// need to do it for our *own* raft ID before we get booted from the cluster
// so that snapshots are identical on all nodes. It's important for a booted
// node to have a snapshot identical to every other node because that node
// can potentially re-enter the cluster with a new raft ID.
pm.removedPeers.Add(raftId)
}
func (pm *ProtocolManager) eventLoop() {
@ -524,16 +673,17 @@ func (pm *ProtocolManager) eventLoop() {
for {
select {
case <-ticker.C:
pm.rawNode.Tick()
pm.rawNode().Tick()
// when the node is first ready it gives us entries to commit and messages
// to immediately publish
case rd := <-pm.rawNode.Ready():
case rd := <-pm.rawNode().Ready():
pm.wal.Save(rd.HardState, rd.Entries)
if snap := rd.Snapshot; !etcdRaft.IsEmptySnap(snap) {
pm.saveSnapshot(snap)
pm.applySnapshot(snap)
pm.saveRaftSnapshot(snap)
pm.applyRaftSnapshot(snap)
pm.advanceAppliedIndex(snap.Metadata.Index)
}
// 1: Write HardState, Entries, and Snapshot to persistent storage if they
@ -555,47 +705,84 @@ func (pm *ProtocolManager) eventLoop() {
if err != nil {
log.Error("error decoding block: ", err)
}
pm.applyNewChainHead(&block)
if pm.blockchain.HasBlock(block.Hash()) {
// This can happen:
//
// if (1) we crashed after applying this block to the chain, but
// before writing appliedIndex to LDB.
// or (2) we crashed in a scenario where we applied further than
// raft *durably persisted* its committed index (see
// https://github.com/coreos/etcd/pull/7899). In this
// scenario, when the node comes back up, we will re-apply
// a few entries.
headBlockHash := pm.blockchain.CurrentBlock().Hash()
log.Warn("not applying already-applied block", "block hash", block.Hash(), "parent", block.ParentHash(), "head", headBlockHash)
} else {
pm.applyNewChainHead(&block)
}
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
cc.Unmarshal(entry.Data)
raftId := uint16(cc.NodeID)
pm.confState = *pm.rawNode.ApplyConfChange(cc)
pm.confState = *pm.rawNode().ApplyConfChange(cc)
forceSnapshot := false
switch cc.Type {
case raftpb.ConfChangeAddNode:
log.Info("adding peer due to ConfChangeAddNode", "peer", cc.NodeID)
if pm.isRaftIdRemoved(raftId) {
log.Info("ignoring ConfChangeAddNode for permanently-removed peer", "raft id", raftId)
} else if raftId <= uint16(len(pm.bootstrapNodes)) {
// See initial cluster logic in startRaft() for more information.
log.Info("ignoring expected ConfChangeAddNode for initial peer", "raft id", raftId)
nodeRaftId := uint16(cc.NodeID)
pm.writePeerAddressBytes(nodeRaftId, cc.Context)
// We need a snapshot to exist to reconnect to peers on start-up after a crash.
forceSnapshot = true
} else if pm.isRaftIdUsed(raftId) {
log.Info("ignoring ConfChangeAddNode for already-used raft ID", "raft id", raftId)
} else {
log.Info("adding peer due to ConfChangeAddNode", "raft id", raftId)
if nodeRaftId != pm.raftId {
forceSnapshot = true
pm.addPeer(bytesToAddress(cc.Context))
}
case raftpb.ConfChangeRemoveNode:
log.Info("removing peer due to ConfChangeRemoveNode", "peer", cc.NodeID)
if nodeRaftId := uint16(cc.NodeID); nodeRaftId == pm.raftId {
exitAfterApplying = true
if pm.isRaftIdRemoved(raftId) {
log.Info("ignoring ConfChangeRemoveNode for already-removed peer", "raft id", raftId)
} else {
pm.removePeer(nodeRaftId)
log.Info("removing peer due to ConfChangeRemoveNode", "raft id", raftId)
forceSnapshot = true
if raftId == pm.raftId {
exitAfterApplying = true
}
pm.removePeer(raftId)
}
case raftpb.ConfChangeUpdateNode:
// NOTE: remember to forceSnapshot in this case, if we add support
// for this.
fatalf("not yet handled: ConfChangeUpdateNode")
}
// We force a snapshot here to persist our updated confState, so we
// know our fellow cluster members when we come back online.
//
// It is critical here to snapshot *before* writing our applied
// index in LevelDB, otherwise a crash while/before snapshotting
// (after advancing our applied index) would result in the loss of a
// cluster member upon restart: we would re-mount with an old
// ConfState.
pm.triggerSnapshotWithNextIndex(entry.Index)
if forceSnapshot {
// We force a snapshot here to persist our updated confState, so we
// know our fellow cluster members when we come back online.
//
// It is critical here to snapshot *before* writing our applied
// index in LevelDB, otherwise a crash while/before snapshotting
// (after advancing our applied index) would result in the loss of a
// cluster member upon restart: we would re-mount with an old
// ConfState.
pm.triggerSnapshot(entry.Index)
}
}
pm.advanceAppliedIndex(entry.Index)
@ -604,13 +791,16 @@ func (pm *ProtocolManager) eventLoop() {
pm.maybeTriggerSnapshot()
if exitAfterApplying {
log.Warn("removing self from the cluster due to ConfChangeRemoveNode")
syscall.Exit(0)
log.Warn("permanently removing self from the cluster")
pm.Stop()
log.Warn("permanently exited the cluster")
return
}
// 4: Call Node.Advance() to signal readiness for the next batch of
// updates.
pm.rawNode.Advance()
pm.rawNode().Advance()
case <-pm.quitSync:
return
@ -618,31 +808,33 @@ func (pm *ProtocolManager) eventLoop() {
}
}
func raftPort(raftId uint16) uint16 {
return 50400 + raftId
}
func makeInitialRaftPeers(initialNodes []*discover.Node) []etcdRaft.Peer {
peers := make([]etcdRaft.Peer, len(initialNodes))
func (pm *ProtocolManager) makeInitialRaftPeers() (raftPeers []etcdRaft.Peer, peerAddresses []*Address, localAddress *Address) {
initialNodes := pm.bootstrapNodes
raftPeers = make([]etcdRaft.Peer, len(initialNodes)) // Entire cluster
peerAddresses = make([]*Address, len(initialNodes)-1) // Cluster without *this* node
peersSeen := 0
for i, node := range initialNodes {
raftId := uint16(i + 1)
// We initially get the raftPort from the enode ID's query string. As an alternative, we can move away from
// requiring the use of static peers for the initial set, and load them from e.g. another JSON file which
// contains pairs of enodes and raft ports, or we can get this initial peer list from commandline flags.
address := newAddress(raftId, node.RaftPort, node)
address := &Address{
raftId: raftId,
nodeId: node.ID,
ip: node.IP,
p2pPort: node.TCP,
raftPort: raftPort(raftId),
}
peers[i] = etcdRaft.Peer{
raftPeers[i] = etcdRaft.Peer{
ID: uint64(raftId),
Context: address.toBytes(),
}
if raftId == pm.raftId {
localAddress = address
} else {
peerAddresses[peersSeen] = address
peersSeen += 1
}
}
return peers
return
}
func sleep(duration time.Duration) {
@ -683,7 +875,9 @@ func (pm *ProtocolManager) applyNewChainHead(block *types.Block) {
// Sets new appliedIndex in-memory, *and* writes this appliedIndex to LevelDB.
func (pm *ProtocolManager) advanceAppliedIndex(index uint64) {
pm.appliedIndex = index
pm.writeAppliedIndex(index)
pm.mu.Lock()
pm.appliedIndex = index
pm.mu.Unlock()
}

View File

@ -20,10 +20,20 @@ type Address struct {
raftPort uint16
}
func newAddress(raftId uint16, raftPort uint16, node *discover.Node) *Address {
return &Address{
raftId: raftId,
nodeId: node.ID,
ip: node.IP,
p2pPort: node.TCP,
raftPort: raftPort,
}
}
// A peer that we're connected to via both raft's http transport, and ethereum p2p
type Peer struct {
address *Address
p2pNode *discover.Node
address *Address // For raft transport
p2pNode *discover.Node // For ethereum transport
}
func (addr *Address) EncodeRLP(w io.Writer) error {

View File

@ -8,7 +8,6 @@ import (
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/opt"
"strconv"
)
var (
@ -16,11 +15,6 @@ var (
NoWriteMerge: false,
Sync: false,
}
mustFsync = &opt.WriteOptions{
NoWriteMerge: false,
Sync: true,
}
)
func openQuorumRaftDb(path string) (db *leveldb.DB, err error) {
@ -46,30 +40,18 @@ func (pm *ProtocolManager) loadAppliedIndex() uint64 {
lastAppliedIndex = binary.LittleEndian.Uint64(dat)
}
log.Info("Persistent applied index load", "last applied index", lastAppliedIndex)
pm.mu.Lock()
pm.appliedIndex = lastAppliedIndex
pm.mu.Unlock()
log.Info("loaded the latest applied index", "lastAppliedIndex", lastAppliedIndex)
return lastAppliedIndex
}
func (pm *ProtocolManager) writeAppliedIndex(index uint64) {
log.Info("Persistent applied index write", "index", index)
log.Info("persisted the latest applied index", "index", index)
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, index)
pm.quorumRaftDb.Put(appliedDbKey, buf, noFsync)
}
func (pm *ProtocolManager) loadPeerAddress(raftId uint16) *Address {
peerUrlKey := []byte(peerUrlKeyPrefix + strconv.Itoa(int(raftId)))
value, err := pm.quorumRaftDb.Get(peerUrlKey, nil)
if err != nil {
fatalf("failed to read address for raft peer %d from leveldb: %v", raftId, err)
}
return bytesToAddress(value)
}
func (pm *ProtocolManager) writePeerAddressBytes(raftId uint16, value []byte) {
key := []byte(peerUrlKeyPrefix + strconv.Itoa(int(raftId)))
pm.quorumRaftDb.Put(key, value, mustFsync)
}

View File

@ -1,13 +1,236 @@
package raft
import (
"fmt"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/wal/walpb"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"gopkg.in/fatih/set.v0"
"io"
"math/big"
"sort"
"time"
)
func (pm *ProtocolManager) saveSnapshot(snap raftpb.Snapshot) error {
// Snapshot
type Snapshot struct {
addresses []Address
removedRaftIds []uint16 // Raft IDs for permanently removed peers
headBlockHash common.Hash
}
type ByRaftId []Address
func (a ByRaftId) Len() int { return len(a) }
func (a ByRaftId) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByRaftId) Less(i, j int) bool { return a[i].raftId < a[j].raftId }
func (pm *ProtocolManager) buildSnapshot() *Snapshot {
pm.mu.RLock()
defer pm.mu.RUnlock()
numNodes := len(pm.confState.Nodes)
numRemovedNodes := pm.removedPeers.Size()
snapshot := &Snapshot{
addresses: make([]Address, numNodes),
removedRaftIds: make([]uint16, numRemovedNodes),
headBlockHash: pm.blockchain.CurrentBlock().Hash(),
}
// Populate addresses
for i, rawRaftId := range pm.confState.Nodes {
raftId := uint16(rawRaftId)
if raftId == pm.raftId {
snapshot.addresses[i] = *pm.address
} else {
snapshot.addresses[i] = *pm.peers[raftId].address
}
}
sort.Sort(ByRaftId(snapshot.addresses))
// Populate removed IDs
for i, removedIface := range pm.removedPeers.List() {
snapshot.removedRaftIds[i] = removedIface.(uint16)
}
return snapshot
}
// Note that we do *not* read `pm.appliedIndex` here. We only use the `index`
// parameter instead. This is because we need to support a scenario when we
// snapshot for a future index that we have not yet recorded in LevelDB. See
// comments around the use of `forceSnapshot`.
func (pm *ProtocolManager) triggerSnapshot(index uint64) {
pm.mu.RLock()
snapshotIndex := pm.snapshotIndex
pm.mu.RUnlock()
log.Info("start snapshot", "applied index", pm.appliedIndex, "last snapshot index", snapshotIndex)
//snapData := pm.blockchain.CurrentBlock().Hash().Bytes()
//snap, err := pm.raftStorage.CreateSnapshot(pm.appliedIndex, &pm.confState, snapData)
snapData := pm.buildSnapshot().toBytes()
snap, err := pm.raftStorage.CreateSnapshot(index, &pm.confState, snapData)
if err != nil {
panic(err)
}
if err := pm.saveRaftSnapshot(snap); err != nil {
panic(err)
}
// Discard all log entries prior to index.
if err := pm.raftStorage.Compact(index); err != nil {
panic(err)
}
log.Info("compacted log", "index", pm.appliedIndex)
pm.mu.Lock()
pm.snapshotIndex = index
pm.mu.Unlock()
}
func confStateIdSet(confState raftpb.ConfState) *set.Set {
set := set.New()
for _, rawRaftId := range confState.Nodes {
set.Add(uint16(rawRaftId))
}
return set
}
func (pm *ProtocolManager) updateClusterMembership(newConfState raftpb.ConfState, addresses []Address, removedRaftIds []uint16) {
log.Info("updating cluster membership per raft snapshot")
prevConfState := pm.confState
// Update tombstones for permanently removed peers. For simplicity we do not
// allow the re-use of peer IDs once a peer is removed.
removedPeers := set.New()
for _, removedRaftId := range removedRaftIds {
removedPeers.Add(removedRaftId)
}
pm.mu.Lock()
pm.removedPeers = removedPeers
pm.mu.Unlock()
// Remove old peers that we're still connected to
prevIds := confStateIdSet(prevConfState)
newIds := confStateIdSet(newConfState)
idsToRemove := set.Difference(prevIds, newIds)
for _, idIfaceToRemove := range idsToRemove.List() {
raftId := idIfaceToRemove.(uint16)
log.Info("removing old raft peer", "peer id", raftId)
pm.removePeer(raftId)
}
// Update local and remote addresses
for _, tempAddress := range addresses {
address := tempAddress // Allocate separately on the heap for each iteration.
if address.raftId == pm.raftId {
// If we're a newcomer to an existing cluster, this is where we learn
// our own Address.
pm.setLocalAddress(&address)
} else {
pm.mu.RLock()
existingPeer := pm.peers[address.raftId]
pm.mu.RUnlock()
if existingPeer == nil {
log.Info("adding new raft peer", "raft id", address.raftId)
pm.addPeer(&address)
}
}
}
pm.mu.Lock()
pm.confState = newConfState
pm.mu.Unlock()
log.Info("updated cluster membership")
}
func (pm *ProtocolManager) maybeTriggerSnapshot() {
pm.mu.RLock()
appliedIndex := pm.appliedIndex
entriesSinceLastSnap := appliedIndex - pm.snapshotIndex
pm.mu.RUnlock()
if entriesSinceLastSnap < snapshotPeriod {
return
}
pm.triggerSnapshot(appliedIndex)
}
func (pm *ProtocolManager) loadSnapshot() *raftpb.Snapshot {
if raftSnapshot := pm.readRaftSnapshot(); raftSnapshot != nil {
log.Info("loading snapshot")
pm.applyRaftSnapshot(*raftSnapshot)
return raftSnapshot
} else {
log.Info("no snapshot to load")
return nil
}
}
func (snapshot *Snapshot) toBytes() []byte {
size, r, err := rlp.EncodeToReader(snapshot)
if err != nil {
panic(fmt.Sprintf("error: failed to RLP-encode Snapshot: %s", err.Error()))
}
var buffer = make([]byte, uint32(size))
r.Read(buffer)
return buffer
}
func bytesToSnapshot(bytes []byte) *Snapshot {
var snapshot Snapshot
if err := rlp.DecodeBytes(bytes, &snapshot); err != nil {
fatalf("failed to RLP-decode Snapshot", "err", err)
}
return &snapshot
}
func (snapshot *Snapshot) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, []interface{}{snapshot.addresses, snapshot.removedRaftIds, snapshot.headBlockHash})
}
func (snapshot *Snapshot) DecodeRLP(s *rlp.Stream) error {
// These fields need to be public:
var temp struct {
Addresses []Address
RemovedRaftIds []uint16
HeadBlockHash common.Hash
}
if err := s.Decode(&temp); err != nil {
return err
} else {
snapshot.addresses, snapshot.removedRaftIds, snapshot.headBlockHash = temp.Addresses, temp.RemovedRaftIds, temp.HeadBlockHash
return nil
}
}
// Raft snapshot
func (pm *ProtocolManager) saveRaftSnapshot(snap raftpb.Snapshot) error {
if err := pm.snapshotter.SaveSnap(snap); err != nil {
return err
}
@ -24,42 +247,7 @@ func (pm *ProtocolManager) saveSnapshot(snap raftpb.Snapshot) error {
return pm.wal.ReleaseLockTo(snap.Metadata.Index)
}
func (pm *ProtocolManager) maybeTriggerSnapshot() {
if pm.appliedIndex-pm.snapshotIndex < snapshotPeriod {
return
}
pm.triggerSnapshot()
}
func (pm *ProtocolManager) triggerSnapshot() {
log.Info("start snapshot", "applied index", pm.appliedIndex, "last snapshot index", pm.snapshotIndex)
snapData := pm.blockchain.CurrentBlock().Hash().Bytes()
snap, err := pm.raftStorage.CreateSnapshot(pm.appliedIndex, &pm.confState, snapData)
if err != nil {
panic(err)
}
if err := pm.saveSnapshot(snap); err != nil {
panic(err)
}
// Discard all log entries prior to appliedIndex.
if err := pm.raftStorage.Compact(pm.appliedIndex); err != nil {
panic(err)
}
log.Info("compacted log", "index", pm.appliedIndex)
pm.snapshotIndex = pm.appliedIndex
}
// For persisting cluster membership changes correctly, we need to trigger a
// snapshot before advancing our persisted appliedIndex in LevelDB.
//
// See handling of EntryConfChange entries in raft/handler.go for details.
func (pm *ProtocolManager) triggerSnapshotWithNextIndex(index uint64) {
pm.appliedIndex = index
pm.triggerSnapshot()
}
func (pm *ProtocolManager) loadSnapshot() *raftpb.Snapshot {
func (pm *ProtocolManager) readRaftSnapshot() *raftpb.Snapshot {
snapshot, err := pm.snapshotter.Load()
if err != nil && err != snap.ErrNoSnapshot {
fatalf("error loading snapshot: %v", err)
@ -68,14 +256,78 @@ func (pm *ProtocolManager) loadSnapshot() *raftpb.Snapshot {
return snapshot
}
func (pm *ProtocolManager) applySnapshot(snap raftpb.Snapshot) {
if err := pm.raftStorage.ApplySnapshot(snap); err != nil {
func (pm *ProtocolManager) applyRaftSnapshot(raftSnapshot raftpb.Snapshot) {
log.Info("applying snapshot to raft storage")
if err := pm.raftStorage.ApplySnapshot(raftSnapshot); err != nil {
fatalf("failed to apply snapshot: %s", err)
}
snapshot := bytesToSnapshot(raftSnapshot.Data)
snapMeta := snap.Metadata
latestBlockHash := snapshot.headBlockHash
pm.updateClusterMembership(raftSnapshot.Metadata.ConfState, snapshot.addresses, snapshot.removedRaftIds)
preSyncHead := pm.blockchain.CurrentBlock()
log.Info("before sync", "chain head", preSyncHead.Hash())
if latestBlock := pm.blockchain.GetBlockByHash(latestBlockHash); latestBlock == nil {
pm.syncBlockchainUntil(latestBlockHash)
pm.logNewlyAcceptedTransactions(preSyncHead)
log.Info(chainExtensionMessage, "hash", pm.blockchain.CurrentBlock().Hash())
} else {
log.Info("blockchain is caught up; no need to synchronize")
}
snapMeta := raftSnapshot.Metadata
pm.confState = snapMeta.ConfState
pm.mu.Lock()
pm.snapshotIndex = snapMeta.Index
pm.advanceAppliedIndex(snapMeta.Index)
pm.mu.Unlock()
}
func (pm *ProtocolManager) syncBlockchainUntil(hash common.Hash) {
pm.mu.RLock()
peerMap := make(map[uint16]*Peer, len(pm.peers))
for raftId, peer := range pm.peers {
peerMap[raftId] = peer
}
pm.mu.RUnlock()
for {
for peerId, peer := range peerMap {
log.Info("synchronizing with peer", "peer id", peerId, "hash", hash)
peerId := peer.p2pNode.ID.String()
peerIdPrefix := fmt.Sprintf("%x", peer.p2pNode.ID[:8])
if err := pm.downloader.Synchronise(peerIdPrefix, hash, big.NewInt(0), downloader.BoundedFullSync); err != nil {
log.Info("failed to synchronize with peer", "peer id", peerId)
time.Sleep(500 * time.Millisecond)
} else {
return
}
}
}
}
func (pm *ProtocolManager) logNewlyAcceptedTransactions(preSyncHead *types.Block) {
newHead := pm.blockchain.CurrentBlock()
numBlocks := newHead.NumberU64() - preSyncHead.NumberU64()
blocks := make([]*types.Block, numBlocks)
currBlock := newHead
blocksSeen := 0
for currBlock.Hash() != preSyncHead.Hash() {
blocks[int(numBlocks)-(1+blocksSeen)] = currBlock
blocksSeen += 1
currBlock = pm.blockchain.GetBlockByHash(currBlock.ParentHash())
}
for _, block := range blocks {
for _, tx := range block.Transactions() {
log.EmitCheckpoint(log.TxAccepted, tx.Hash().Hex())
}
}
}

View File

@ -9,7 +9,7 @@ import (
"github.com/ethereum/go-ethereum/log"
)
func (pm *ProtocolManager) openWAL(maybeSnapshot *raftpb.Snapshot) *wal.WAL {
func (pm *ProtocolManager) openWAL(maybeRaftSnapshot *raftpb.Snapshot) *wal.WAL {
if !wal.Exist(pm.waldir) {
if err := os.Mkdir(pm.waldir, 0750); err != nil {
fatalf("cannot create waldir: %s", err)
@ -23,13 +23,13 @@ func (pm *ProtocolManager) openWAL(maybeSnapshot *raftpb.Snapshot) *wal.WAL {
}
walsnap := walpb.Snapshot{}
if maybeSnapshot != nil {
walsnap.Index = maybeSnapshot.Metadata.Index
walsnap.Term = maybeSnapshot.Metadata.Term
}
log.Info("loading WAL", "term", walsnap.Term, "index", walsnap.Index)
if maybeRaftSnapshot != nil {
walsnap.Index, walsnap.Term = maybeRaftSnapshot.Metadata.Index, maybeRaftSnapshot.Metadata.Term
}
wal, err := wal.Open(pm.waldir, walsnap)
if err != nil {
fatalf("error loading WAL: %s", err)
@ -38,20 +38,15 @@ func (pm *ProtocolManager) openWAL(maybeSnapshot *raftpb.Snapshot) *wal.WAL {
return wal
}
func (pm *ProtocolManager) replayWAL() *wal.WAL {
func (pm *ProtocolManager) replayWAL(maybeRaftSnapshot *raftpb.Snapshot) *wal.WAL {
log.Info("replaying WAL")
maybeSnapshot := pm.loadSnapshot()
wal := pm.openWAL(maybeSnapshot)
wal := pm.openWAL(maybeRaftSnapshot)
_, hardState, entries, err := wal.ReadAll()
if err != nil {
fatalf("failed to read WAL: %s", err)
}
if maybeSnapshot != nil {
pm.applySnapshot(*maybeSnapshot)
}
pm.raftStorage.SetHardState(hardState)
pm.raftStorage.Append(entries)