2017-03-23 11:41:02 -07:00
package raft
import (
2018-07-11 21:44:32 -07:00
"errors"
2017-03-23 11:41:02 -07:00
"fmt"
"net/http"
"net/url"
"os"
"strconv"
"sync"
"time"
"golang.org/x/net/context"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/wal"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/event"
2017-10-31 15:24:11 -07:00
"github.com/ethereum/go-ethereum/log"
2017-03-23 11:41:02 -07:00
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/rlp"
"github.com/coreos/etcd/etcdserver/stats"
raftTypes "github.com/coreos/etcd/pkg/types"
etcdRaft "github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/rafthttp"
"github.com/syndtr/goleveldb/leveldb"
2017-08-28 14:43:44 -07:00
"gopkg.in/fatih/set.v0"
2017-03-23 11:41:02 -07:00
)
type ProtocolManager struct {
2017-08-28 14:43:44 -07:00
mu sync . RWMutex // For protecting concurrent JS access to "local peer" and "remote peer" state
quitSync chan struct { }
stopped bool
// Static configuration
joinExisting bool // Whether to join an existing cluster when a WAL doesn't already exist
bootstrapNodes [ ] * discover . Node
raftId uint16
raftPort uint16
// Local peer state (protected by mu vs concurrent access via JS)
address * Address
role int // Role: minter or verifier
appliedIndex uint64 // The index of the last-applied raft entry
snapshotIndex uint64 // The index of the latest snapshot.
// Remote peer state (protected by mu vs concurrent access via JS)
2018-07-11 21:44:32 -07:00
leader uint16
2017-08-28 14:43:44 -07:00
peers map [ uint16 ] * Peer
removedPeers * set . Set // *Permanently removed* peers
// P2P transport
p2pServer * p2p . Server // Initialized in start()
// Blockchain services
2017-03-23 11:41:02 -07:00
blockchain * core . BlockChain
2017-08-28 14:43:44 -07:00
downloader * downloader . Downloader
minter * minter
2017-03-23 11:41:02 -07:00
2017-08-28 14:43:44 -07:00
// Blockchain events
2017-03-23 11:41:02 -07:00
eventMux * event . TypeMux
2017-10-31 15:24:11 -07:00
minedBlockSub * event . TypeMuxSubscription
2017-03-23 11:41:02 -07:00
2017-08-28 14:43:44 -07:00
// Raft proposal events
blockProposalC chan * types . Block // for mined blocks to raft
confChangeProposalC chan raftpb . ConfChange // for config changes from js console to raft
2017-03-23 11:41:02 -07:00
2017-08-28 14:43:44 -07:00
// Raft transport
unsafeRawNode etcdRaft . Node
transport * rafthttp . Transport
httpstopc chan struct { }
httpdonec chan struct { }
2017-03-23 11:41:02 -07:00
2017-08-28 14:43:44 -07:00
// Raft snapshotting
2017-03-23 11:41:02 -07:00
snapshotter * snap . Snapshotter
snapdir string
confState raftpb . ConfState
2017-08-28 14:43:44 -07:00
// Raft write-ahead log
2017-03-23 11:41:02 -07:00
waldir string
wal * wal . WAL
2017-08-28 14:43:44 -07:00
// Storage
quorumRaftDb * leveldb . DB // Persistent storage for last-applied raft index
raftStorage * etcdRaft . MemoryStorage // Volatile raft storage
2017-03-23 11:41:02 -07:00
}
//
// Public interface
//
2017-08-28 14:43:44 -07:00
func NewProtocolManager ( raftId uint16 , raftPort uint16 , blockchain * core . BlockChain , mux * event . TypeMux , bootstrapNodes [ ] * discover . Node , joinExisting bool , datadir string , minter * minter , downloader * downloader . Downloader ) ( * ProtocolManager , error ) {
2017-03-23 11:41:02 -07:00
waldir := fmt . Sprintf ( "%s/raft-wal" , datadir )
snapdir := fmt . Sprintf ( "%s/raft-snap" , datadir )
quorumRaftDbLoc := fmt . Sprintf ( "%s/quorum-raft-state" , datadir )
manager := & ProtocolManager {
2017-08-28 14:43:44 -07:00
bootstrapNodes : bootstrapNodes ,
peers : make ( map [ uint16 ] * Peer ) ,
2018-07-11 21:44:32 -07:00
leader : uint16 ( etcdRaft . None ) ,
2017-08-28 14:43:44 -07:00
removedPeers : set . New ( ) ,
joinExisting : joinExisting ,
blockchain : blockchain ,
eventMux : mux ,
blockProposalC : make ( chan * types . Block ) ,
confChangeProposalC : make ( chan raftpb . ConfChange ) ,
httpstopc : make ( chan struct { } ) ,
httpdonec : make ( chan struct { } ) ,
waldir : waldir ,
snapdir : snapdir ,
snapshotter : snap . New ( snapdir ) ,
raftId : raftId ,
raftPort : raftPort ,
quitSync : make ( chan struct { } ) ,
raftStorage : etcdRaft . NewMemoryStorage ( ) ,
minter : minter ,
downloader : downloader ,
2017-03-23 11:41:02 -07:00
}
if db , err := openQuorumRaftDb ( quorumRaftDbLoc ) ; err != nil {
return nil , err
} else {
manager . quorumRaftDb = db
}
return manager , nil
}
2017-08-28 14:43:44 -07:00
func ( pm * ProtocolManager ) Start ( p2pServer * p2p . Server ) {
2017-10-31 15:24:11 -07:00
log . Info ( "starting raft protocol handler" )
2017-03-23 11:41:02 -07:00
2017-08-28 14:43:44 -07:00
pm . p2pServer = p2pServer
2017-03-23 11:41:02 -07:00
pm . minedBlockSub = pm . eventMux . Subscribe ( core . NewMinedBlockEvent { } )
2017-08-28 14:43:44 -07:00
pm . startRaft ( )
go pm . minedBroadcastLoop ( )
2017-03-23 11:41:02 -07:00
}
func ( pm * ProtocolManager ) Stop ( ) {
2017-08-28 14:43:44 -07:00
pm . mu . Lock ( )
defer pm . mu . Unlock ( )
2017-10-31 15:24:11 -07:00
defer log . Info ( "raft protocol handler stopped" )
2017-08-28 14:43:44 -07:00
if pm . stopped {
return
}
2017-10-31 15:24:11 -07:00
log . Info ( "stopping raft protocol handler..." )
2017-03-23 11:41:02 -07:00
2017-08-28 14:43:44 -07:00
for raftId , peer := range pm . peers {
pm . disconnectFromPeer ( raftId , peer )
}
2017-03-23 11:41:02 -07:00
pm . minedBlockSub . Unsubscribe ( )
2017-08-28 14:43:44 -07:00
if pm . transport != nil {
pm . transport . Stop ( )
}
2017-03-23 11:41:02 -07:00
close ( pm . httpstopc )
<- pm . httpdonec
close ( pm . quitSync )
2017-08-28 14:43:44 -07:00
if pm . unsafeRawNode != nil {
pm . unsafeRawNode . Stop ( )
2017-03-23 11:41:02 -07:00
}
pm . quorumRaftDb . Close ( )
2017-08-28 14:43:44 -07:00
pm . p2pServer = nil
2017-03-23 11:41:02 -07:00
pm . minter . stop ( )
2017-08-28 14:43:44 -07:00
pm . stopped = true
2017-03-23 11:41:02 -07:00
}
func ( pm * ProtocolManager ) NodeInfo ( ) * RaftNodeInfo {
2017-08-28 14:43:44 -07:00
pm . mu . RLock ( ) // as we read role and peers
2017-03-23 11:41:02 -07:00
defer pm . mu . RUnlock ( )
var roleDescription string
if pm . role == minterRole {
roleDescription = "minter"
} else {
roleDescription = "verifier"
}
2017-08-28 14:43:44 -07:00
peerAddresses := make ( [ ] * Address , len ( pm . peers ) )
peerIdx := 0
for _ , peer := range pm . peers {
peerAddresses [ peerIdx ] = peer . address
peerIdx += 1
}
removedPeerIfaces := pm . removedPeers . List ( )
removedPeerIds := make ( [ ] uint16 , len ( removedPeerIfaces ) )
for i , removedIface := range removedPeerIfaces {
removedPeerIds [ i ] = removedIface . ( uint16 )
}
//
// NOTE: before exposing any new fields here, make sure that the underlying
// ProtocolManager members are protected from concurrent access by pm.mu!
//
2017-03-23 11:41:02 -07:00
return & RaftNodeInfo {
2017-08-28 14:43:44 -07:00
ClusterSize : len ( pm . peers ) + 1 ,
Role : roleDescription ,
Address : pm . address ,
PeerAddresses : peerAddresses ,
RemovedPeerIds : removedPeerIds ,
AppliedIndex : pm . appliedIndex ,
SnapshotIndex : pm . snapshotIndex ,
}
}
// There seems to be a very rare race in raft where during `etcdRaft.StartNode`
// it will call back our `Process` method before it's finished returning the
// `raft.Node`, `pm.unsafeRawNode`, to us. This re-entrance through a separate
// thread will cause a nil pointer dereference. To work around this, this
// getter method should be used instead of reading `pm.unsafeRawNode` directly.
func ( pm * ProtocolManager ) rawNode ( ) etcdRaft . Node {
for pm . unsafeRawNode == nil {
time . Sleep ( 100 * time . Millisecond )
}
return pm . unsafeRawNode
}
func ( pm * ProtocolManager ) nextRaftId ( ) uint16 {
pm . mu . RLock ( )
defer pm . mu . RUnlock ( )
maxId := pm . raftId
for peerId := range pm . peers {
if maxId < peerId {
maxId = peerId
}
}
removedPeerIfaces := pm . removedPeers . List ( )
for _ , removedIface := range removedPeerIfaces {
removedId := removedIface . ( uint16 )
if maxId < removedId {
maxId = removedId
}
}
return maxId + 1
}
func ( pm * ProtocolManager ) isRaftIdRemoved ( id uint16 ) bool {
pm . mu . RLock ( )
defer pm . mu . RUnlock ( )
return pm . removedPeers . Has ( id )
}
func ( pm * ProtocolManager ) isRaftIdUsed ( raftId uint16 ) bool {
if pm . raftId == raftId || pm . isRaftIdRemoved ( raftId ) {
return true
}
pm . mu . RLock ( )
defer pm . mu . RUnlock ( )
return pm . peers [ raftId ] != nil
}
2018-09-21 08:57:26 -07:00
func ( pm * ProtocolManager ) isNodeAlreadyInCluster ( node * discover . Node ) error {
2017-08-28 14:43:44 -07:00
pm . mu . RLock ( )
defer pm . mu . RUnlock ( )
for _ , peer := range pm . peers {
2018-09-21 08:57:26 -07:00
peerRaftId := peer . address . RaftId
peerNode := peer . p2pNode
if peerNode . ID == node . ID {
return fmt . Errorf ( "node with this enode has already been added to the cluster: %v" , node . ID )
}
if peerNode . IP . Equal ( node . IP ) {
if peerNode . TCP == node . TCP {
return fmt . Errorf ( "existing node %v with raft ID %v is already using eth p2p at %v:%v" , peerNode . ID , peerRaftId , node . IP , node . TCP )
} else if peer . address . RaftPort == node . RaftPort {
return fmt . Errorf ( "existing node %v with raft ID %v is already using raft at %v:%v" , peerNode . ID , peerRaftId , node . IP , node . RaftPort )
}
2017-08-28 14:43:44 -07:00
}
}
2018-09-21 08:57:26 -07:00
return nil
2017-08-28 14:43:44 -07:00
}
func ( pm * ProtocolManager ) ProposeNewPeer ( enodeId string ) ( uint16 , error ) {
node , err := discover . ParseNode ( enodeId )
if err != nil {
return 0 , err
}
if len ( node . IP ) != 4 {
return 0 , fmt . Errorf ( "expected IPv4 address (with length 4), but got IP of length %v" , len ( node . IP ) )
}
if ! node . HasRaftPort ( ) {
return 0 , fmt . Errorf ( "enodeId is missing raftport querystring parameter: %v" , enodeId )
}
2018-09-21 08:57:26 -07:00
if err := pm . isNodeAlreadyInCluster ( node ) ; err != nil {
return 0 , err
}
2017-08-28 14:43:44 -07:00
raftId := pm . nextRaftId ( )
address := newAddress ( raftId , node . RaftPort , node )
pm . confChangeProposalC <- raftpb . ConfChange {
Type : raftpb . ConfChangeAddNode ,
NodeID : uint64 ( raftId ) ,
Context : address . toBytes ( ) ,
}
return raftId , nil
}
func ( pm * ProtocolManager ) ProposePeerRemoval ( raftId uint16 ) {
pm . confChangeProposalC <- raftpb . ConfChange {
Type : raftpb . ConfChangeRemoveNode ,
NodeID : uint64 ( raftId ) ,
2017-03-23 11:41:02 -07:00
}
}
//
// MsgWriter interface (necessary for p2p.Send)
//
func ( pm * ProtocolManager ) WriteMsg ( msg p2p . Msg ) error {
// read *into* buffer
var buffer = make ( [ ] byte , msg . Size )
msg . Payload . Read ( buffer )
2017-08-28 14:43:44 -07:00
return pm . rawNode ( ) . Propose ( context . TODO ( ) , buffer )
2017-03-23 11:41:02 -07:00
}
//
// Raft interface
//
func ( pm * ProtocolManager ) Process ( ctx context . Context , m raftpb . Message ) error {
2017-08-28 14:43:44 -07:00
return pm . rawNode ( ) . Step ( ctx , m )
2017-03-23 11:41:02 -07:00
}
func ( pm * ProtocolManager ) IsIDRemoved ( id uint64 ) bool {
2017-08-28 14:43:44 -07:00
return pm . isRaftIdRemoved ( uint16 ( id ) )
2017-03-23 11:41:02 -07:00
}
func ( pm * ProtocolManager ) ReportUnreachable ( id uint64 ) {
2017-10-31 15:24:11 -07:00
log . Info ( "peer is currently unreachable" , "peer id" , id )
2017-08-28 14:43:44 -07:00
pm . rawNode ( ) . ReportUnreachable ( id )
2017-03-23 11:41:02 -07:00
}
func ( pm * ProtocolManager ) ReportSnapshot ( id uint64 , status etcdRaft . SnapshotStatus ) {
2017-08-28 14:43:44 -07:00
if status == etcdRaft . SnapshotFailure {
2017-10-31 15:24:11 -07:00
log . Info ( "failed to send snapshot" , "raft peer" , id )
2017-08-28 14:43:44 -07:00
} else if status == etcdRaft . SnapshotFinish {
2017-10-31 15:24:11 -07:00
log . Info ( "finished sending snapshot" , "raft peer" , id )
2017-08-28 14:43:44 -07:00
}
pm . rawNode ( ) . ReportSnapshot ( id , status )
2017-03-23 11:41:02 -07:00
}
//
// Private methods
//
2017-08-28 14:43:44 -07:00
func ( pm * ProtocolManager ) startRaft ( ) {
2017-03-23 11:41:02 -07:00
if ! fileutil . Exist ( pm . snapdir ) {
if err := os . Mkdir ( pm . snapdir , 0750 ) ; err != nil {
2017-10-31 15:24:11 -07:00
fatalf ( "cannot create dir for snapshot (%v)" , err )
2017-03-23 11:41:02 -07:00
}
}
walExisted := wal . Exist ( pm . waldir )
2017-08-28 14:43:44 -07:00
lastAppliedIndex := pm . loadAppliedIndex ( )
ss := & stats . ServerStats { }
ss . Initialize ( )
pm . transport = & rafthttp . Transport {
ID : raftTypes . ID ( pm . raftId ) ,
ClusterID : 0x1000 ,
Raft : pm ,
ServerStats : ss ,
LeaderStats : stats . NewLeaderStats ( strconv . Itoa ( int ( pm . raftId ) ) ) ,
ErrorC : make ( chan error ) ,
}
pm . transport . Start ( )
// We load the snapshot to connect to prev peers before replaying the WAL,
// which typically goes further into the future than the snapshot.
var maybeRaftSnapshot * raftpb . Snapshot
if walExisted {
maybeRaftSnapshot = pm . loadSnapshot ( ) // re-establishes peer connections
}
pm . wal = pm . replayWAL ( maybeRaftSnapshot )
if walExisted {
if hardState , _ , err := pm . raftStorage . InitialState ( ) ; err != nil {
panic ( fmt . Sprintf ( "failed to read initial state from raft while restarting: %v" , err ) )
} else {
if lastPersistedCommittedIndex := hardState . Commit ; lastPersistedCommittedIndex < lastAppliedIndex {
2017-10-31 15:24:11 -07:00
log . Info ( "rolling back applied index to last-durably-committed" , "last applied index" , lastAppliedIndex , "last persisted index" , lastPersistedCommittedIndex )
2017-03-23 11:41:02 -07:00
2017-08-28 14:43:44 -07:00
// Roll back our applied index. See the logic and explanation around
// the single call to `pm.applyNewChainHead` for more context.
lastAppliedIndex = lastPersistedCommittedIndex
}
}
}
2017-03-23 11:41:02 -07:00
// NOTE: cockroach sets this to false for now until they've "worked out the
// bugs"
enablePreVote := true
2017-08-28 14:43:44 -07:00
raftConfig := & etcdRaft . Config {
2017-03-23 11:41:02 -07:00
Applied : lastAppliedIndex ,
2017-08-28 14:43:44 -07:00
ID : uint64 ( pm . raftId ) ,
2017-03-23 11:41:02 -07:00
ElectionTick : 10 , // NOTE: cockroach sets this to 15
HeartbeatTick : 1 , // NOTE: cockroach sets this to 5
Storage : pm . raftStorage ,
// NOTE, from cockroach:
// "PreVote and CheckQuorum are two ways of achieving the same thing.
// PreVote is more compatible with quiesced ranges, so we want to switch
// to it once we've worked out the bugs."
2017-10-31 15:24:11 -07:00
//
// TODO: vendor again?
// PreVote: enablePreVote,
2017-03-23 11:41:02 -07:00
CheckQuorum : ! enablePreVote ,
// MaxSizePerMsg controls how many Raft log entries the leader will send to
// followers in a single MsgApp.
MaxSizePerMsg : 4096 , // NOTE: in cockroachdb this is 16*1024
// MaxInflightMsgs controls how many in-flight messages Raft will send to
// a follower without hearing a response. The total number of Raft log
// entries is a combination of this setting and MaxSizePerMsg.
//
// NOTE: Cockroach's settings (MaxSizePerMsg of 4k and MaxInflightMsgs
// of 4) provide for up to 64 KB of raft log to be sent without
// acknowledgement. With an average entry size of 1 KB that translates
// to ~64 commands that might be executed in the handling of a single
// etcdraft.Ready operation.
MaxInflightMsgs : 256 , // NOTE: in cockroachdb this is 4
}
2017-10-31 15:24:11 -07:00
log . Info ( "startRaft" , "raft ID" , raftConfig . ID )
2017-03-23 11:41:02 -07:00
if walExisted {
2017-10-31 15:24:11 -07:00
log . Info ( "remounting an existing raft log; connecting to peers." )
2017-08-28 14:43:44 -07:00
pm . unsafeRawNode = etcdRaft . RestartNode ( raftConfig )
} else if pm . joinExisting {
2017-10-31 15:24:11 -07:00
log . Info ( "newly joining an existing cluster; waiting for connections." )
2017-08-28 14:43:44 -07:00
pm . unsafeRawNode = etcdRaft . StartNode ( raftConfig , nil )
2017-03-23 11:41:02 -07:00
} else {
2017-08-28 14:43:44 -07:00
if numPeers := len ( pm . bootstrapNodes ) ; numPeers == 0 {
2017-03-23 11:41:02 -07:00
panic ( "exiting due to empty raft peers list" )
} else {
2017-10-31 15:24:11 -07:00
log . Info ( "starting a new raft log" , "initial cluster size of" , numPeers )
2017-03-23 11:41:02 -07:00
}
2017-08-28 14:43:44 -07:00
raftPeers , peerAddresses , localAddress := pm . makeInitialRaftPeers ( )
pm . setLocalAddress ( localAddress )
// We add all peers up-front even though we will see a ConfChangeAddNode
// for each shortly. This is because raft's ConfState will contain all of
// these nodes before we see these log entries, and we always want our
// snapshots to have all addresses for each of the nodes in the ConfState.
for _ , peerAddress := range peerAddresses {
pm . addPeer ( peerAddress )
}
pm . unsafeRawNode = etcdRaft . StartNode ( raftConfig , raftPeers )
2017-03-23 11:41:02 -07:00
}
go pm . serveRaft ( )
2017-08-28 14:43:44 -07:00
go pm . serveLocalProposals ( )
2017-03-23 11:41:02 -07:00
go pm . eventLoop ( )
2017-08-28 14:43:44 -07:00
go pm . handleRoleChange ( pm . rawNode ( ) . RoleChan ( ) . Out ( ) )
}
func ( pm * ProtocolManager ) setLocalAddress ( addr * Address ) {
pm . mu . Lock ( )
pm . address = addr
pm . mu . Unlock ( )
// By setting `URLs` on the raft transport, we advertise our URL (in an HTTP
// header) to any recipient. This is necessary for a newcomer to the cluster
// to be able to accept a snapshot from us to bootstrap them.
if urls , err := raftTypes . NewURLs ( [ ] string { raftUrl ( addr ) } ) ; err == nil {
pm . transport . URLs = urls
} else {
panic ( fmt . Sprintf ( "error: could not create URL from local address: %v" , addr ) )
}
2017-03-23 11:41:02 -07:00
}
func ( pm * ProtocolManager ) serveRaft ( ) {
2017-08-28 14:43:44 -07:00
urlString := fmt . Sprintf ( "http://0.0.0.0:%d" , pm . raftPort )
2017-03-23 11:41:02 -07:00
url , err := url . Parse ( urlString )
if err != nil {
2017-10-31 15:24:11 -07:00
fatalf ( "Failed parsing URL (%v)" , err )
2017-03-23 11:41:02 -07:00
}
listener , err := newStoppableListener ( url . Host , pm . httpstopc )
if err != nil {
2017-10-31 15:24:11 -07:00
fatalf ( "Failed to listen rafthttp (%v)" , err )
2017-03-23 11:41:02 -07:00
}
err = ( & http . Server { Handler : pm . transport . Handler ( ) } ) . Serve ( listener )
select {
case <- pm . httpstopc :
default :
2017-10-31 15:24:11 -07:00
fatalf ( "Failed to serve rafthttp (%v)" , err )
2017-03-23 11:41:02 -07:00
}
close ( pm . httpdonec )
}
func ( pm * ProtocolManager ) handleRoleChange ( roleC <- chan interface { } ) {
for {
select {
case role := <- roleC :
intRole , ok := role . ( int )
if ! ok {
panic ( "Couldn't cast role to int" )
}
if intRole == minterRole {
2017-10-31 15:24:11 -07:00
log . EmitCheckpoint ( log . BecameMinter )
2017-03-23 11:41:02 -07:00
pm . minter . start ( )
} else { // verifier
2017-10-31 15:24:11 -07:00
log . EmitCheckpoint ( log . BecameVerifier )
2017-03-23 11:41:02 -07:00
pm . minter . stop ( )
}
pm . mu . Lock ( )
pm . role = intRole
pm . mu . Unlock ( )
case <- pm . quitSync :
return
}
}
}
2017-08-28 14:43:44 -07:00
func ( pm * ProtocolManager ) minedBroadcastLoop ( ) {
2017-03-23 11:41:02 -07:00
for obj := range pm . minedBlockSub . Chan ( ) {
switch ev := obj . Data . ( type ) {
case core . NewMinedBlockEvent :
select {
2017-08-28 14:43:44 -07:00
case pm . blockProposalC <- ev . Block :
2017-03-23 11:41:02 -07:00
case <- pm . quitSync :
return
}
}
}
}
2017-08-28 14:43:44 -07:00
// Serve two channels to handle new blocks and raft configuration changes originating locally.
func ( pm * ProtocolManager ) serveLocalProposals ( ) {
2017-03-23 11:41:02 -07:00
//
// TODO: does it matter that this will restart from 0 whenever we restart a cluster?
//
var confChangeCount uint64
for {
select {
2017-08-28 14:43:44 -07:00
case block , ok := <- pm . blockProposalC :
2017-03-23 11:41:02 -07:00
if ! ok {
2017-10-31 15:24:11 -07:00
log . Info ( "error: read from proposeC failed" )
2017-03-23 11:41:02 -07:00
return
}
size , r , err := rlp . EncodeToReader ( block )
if err != nil {
panic ( fmt . Sprintf ( "error: failed to send RLP-encoded block: %s" , err . Error ( ) ) )
}
var buffer = make ( [ ] byte , uint32 ( size ) )
r . Read ( buffer )
// blocks until accepted by the raft state machine
2017-08-28 14:43:44 -07:00
pm . rawNode ( ) . Propose ( context . TODO ( ) , buffer )
case cc , ok := <- pm . confChangeProposalC :
2017-03-23 11:41:02 -07:00
if ! ok {
2017-10-31 15:24:11 -07:00
log . Info ( "error: read from confChangeC failed" )
2017-03-23 11:41:02 -07:00
return
}
confChangeCount ++
cc . ID = confChangeCount
2017-08-28 14:43:44 -07:00
pm . rawNode ( ) . ProposeConfChange ( context . TODO ( ) , cc )
2017-03-23 11:41:02 -07:00
case <- pm . quitSync :
return
}
}
}
2017-08-28 14:43:44 -07:00
func ( pm * ProtocolManager ) entriesToApply ( allEntries [ ] raftpb . Entry ) ( entriesToApply [ ] raftpb . Entry ) {
if len ( allEntries ) == 0 {
2017-03-23 11:41:02 -07:00
return
}
2017-08-28 14:43:44 -07:00
first := allEntries [ 0 ] . Index
pm . mu . RLock ( )
2017-03-23 11:41:02 -07:00
lastApplied := pm . appliedIndex
2017-08-28 14:43:44 -07:00
pm . mu . RUnlock ( )
2017-03-23 11:41:02 -07:00
if first > lastApplied + 1 {
2017-10-31 15:24:11 -07:00
fatalf ( "first index of committed entry[%d] should <= appliedIndex[%d] + 1" , first , lastApplied )
2017-03-23 11:41:02 -07:00
}
firstToApply := lastApplied - first + 1
2017-08-28 14:43:44 -07:00
if firstToApply < uint64 ( len ( allEntries ) ) {
entriesToApply = allEntries [ firstToApply : ]
2017-03-23 11:41:02 -07:00
}
return
}
2017-08-28 14:43:44 -07:00
func raftUrl ( address * Address ) string {
2018-09-11 12:05:42 -07:00
return fmt . Sprintf ( "http://%s:%d" , address . Ip , address . RaftPort )
2017-03-23 11:41:02 -07:00
}
2017-08-28 14:43:44 -07:00
func ( pm * ProtocolManager ) addPeer ( address * Address ) {
pm . mu . Lock ( )
defer pm . mu . Unlock ( )
2018-09-11 12:05:42 -07:00
raftId := address . RaftId
2017-08-28 14:43:44 -07:00
// Add P2P connection:
2018-09-11 12:05:42 -07:00
p2pNode := discover . NewNode ( address . NodeId , address . Ip , 0 , uint16 ( address . P2pPort ) )
2017-08-28 14:43:44 -07:00
pm . p2pServer . AddPeer ( p2pNode )
// Add raft transport connection:
pm . transport . AddPeer ( raftTypes . ID ( raftId ) , [ ] string { raftUrl ( address ) } )
pm . peers [ raftId ] = & Peer { address , p2pNode }
2017-03-23 11:41:02 -07:00
}
2017-08-28 14:43:44 -07:00
func ( pm * ProtocolManager ) disconnectFromPeer ( raftId uint16 , peer * Peer ) {
pm . p2pServer . RemovePeer ( peer . p2pNode )
pm . transport . RemovePeer ( raftTypes . ID ( raftId ) )
}
2017-03-23 11:41:02 -07:00
2017-08-28 14:43:44 -07:00
func ( pm * ProtocolManager ) removePeer ( raftId uint16 ) {
pm . mu . Lock ( )
defer pm . mu . Unlock ( )
2017-03-23 11:41:02 -07:00
2017-08-28 14:43:44 -07:00
if peer := pm . peers [ raftId ] ; peer != nil {
pm . disconnectFromPeer ( raftId , peer )
delete ( pm . peers , raftId )
2017-03-23 11:41:02 -07:00
}
2017-08-28 14:43:44 -07:00
// This is only necessary sometimes, but it's idempotent. Also, we *always*
// do this, and not just when there's still a peer in the map, because we
// need to do it for our *own* raft ID before we get booted from the cluster
// so that snapshots are identical on all nodes. It's important for a booted
// node to have a snapshot identical to every other node because that node
// can potentially re-enter the cluster with a new raft ID.
pm . removedPeers . Add ( raftId )
2017-03-23 11:41:02 -07:00
}
func ( pm * ProtocolManager ) eventLoop ( ) {
ticker := time . NewTicker ( tickerMS * time . Millisecond )
defer ticker . Stop ( )
defer pm . wal . Close ( )
2017-08-28 14:43:44 -07:00
exitAfterApplying := false
2017-03-23 11:41:02 -07:00
for {
select {
case <- ticker . C :
2017-08-28 14:43:44 -07:00
pm . rawNode ( ) . Tick ( )
2017-03-23 11:41:02 -07:00
// when the node is first ready it gives us entries to commit and messages
// to immediately publish
2017-08-28 14:43:44 -07:00
case rd := <- pm . rawNode ( ) . Ready ( ) :
2017-03-23 11:41:02 -07:00
pm . wal . Save ( rd . HardState , rd . Entries )
2018-07-11 21:44:32 -07:00
if rd . SoftState != nil {
pm . updateLeader ( rd . SoftState . Lead )
}
2017-03-23 11:41:02 -07:00
if snap := rd . Snapshot ; ! etcdRaft . IsEmptySnap ( snap ) {
2017-08-28 14:43:44 -07:00
pm . saveRaftSnapshot ( snap )
pm . applyRaftSnapshot ( snap )
pm . advanceAppliedIndex ( snap . Metadata . Index )
2017-03-23 11:41:02 -07:00
}
// 1: Write HardState, Entries, and Snapshot to persistent storage if they
// are not empty.
pm . raftStorage . Append ( rd . Entries )
// 2: Send all Messages to the nodes named in the To field.
pm . transport . Send ( rd . Messages )
// 3: Apply Snapshot (if any) and CommittedEntries to the state machine.
for _ , entry := range pm . entriesToApply ( rd . CommittedEntries ) {
switch entry . Type {
case raftpb . EntryNormal :
if len ( entry . Data ) == 0 {
break
}
var block types . Block
err := rlp . DecodeBytes ( entry . Data , & block )
if err != nil {
2017-10-31 15:24:11 -07:00
log . Error ( "error decoding block: " , err )
2017-03-23 11:41:02 -07:00
}
2017-08-28 14:43:44 -07:00
2017-10-31 15:24:11 -07:00
if pm . blockchain . HasBlock ( block . Hash ( ) , block . NumberU64 ( ) ) {
2017-08-28 14:43:44 -07:00
// This can happen:
//
// if (1) we crashed after applying this block to the chain, but
// before writing appliedIndex to LDB.
// or (2) we crashed in a scenario where we applied further than
// raft *durably persisted* its committed index (see
// https://github.com/coreos/etcd/pull/7899). In this
// scenario, when the node comes back up, we will re-apply
// a few entries.
headBlockHash := pm . blockchain . CurrentBlock ( ) . Hash ( )
2017-10-31 15:24:11 -07:00
log . Warn ( "not applying already-applied block" , "block hash" , block . Hash ( ) , "parent" , block . ParentHash ( ) , "head" , headBlockHash )
2017-08-28 14:43:44 -07:00
} else {
pm . applyNewChainHead ( & block )
}
2017-03-23 11:41:02 -07:00
case raftpb . EntryConfChange :
var cc raftpb . ConfChange
cc . Unmarshal ( entry . Data )
2017-08-28 14:43:44 -07:00
raftId := uint16 ( cc . NodeID )
pm . confState = * pm . rawNode ( ) . ApplyConfChange ( cc )
2017-03-23 11:41:02 -07:00
2017-08-28 14:43:44 -07:00
forceSnapshot := false
2017-03-23 11:41:02 -07:00
switch cc . Type {
case raftpb . ConfChangeAddNode :
2017-08-28 14:43:44 -07:00
if pm . isRaftIdRemoved ( raftId ) {
2017-10-31 15:24:11 -07:00
log . Info ( "ignoring ConfChangeAddNode for permanently-removed peer" , "raft id" , raftId )
2017-08-28 14:43:44 -07:00
} else if raftId <= uint16 ( len ( pm . bootstrapNodes ) ) {
// See initial cluster logic in startRaft() for more information.
2017-10-31 15:24:11 -07:00
log . Info ( "ignoring expected ConfChangeAddNode for initial peer" , "raft id" , raftId )
2017-08-28 14:43:44 -07:00
// We need a snapshot to exist to reconnect to peers on start-up after a crash.
forceSnapshot = true
} else if pm . isRaftIdUsed ( raftId ) {
2017-10-31 15:24:11 -07:00
log . Info ( "ignoring ConfChangeAddNode for already-used raft ID" , "raft id" , raftId )
2017-08-28 14:43:44 -07:00
} else {
2017-10-31 15:24:11 -07:00
log . Info ( "adding peer due to ConfChangeAddNode" , "raft id" , raftId )
2017-08-28 14:43:44 -07:00
forceSnapshot = true
pm . addPeer ( bytesToAddress ( cc . Context ) )
2017-03-23 11:41:02 -07:00
}
case raftpb . ConfChangeRemoveNode :
2017-08-28 14:43:44 -07:00
if pm . isRaftIdRemoved ( raftId ) {
2017-10-31 15:24:11 -07:00
log . Info ( "ignoring ConfChangeRemoveNode for already-removed peer" , "raft id" , raftId )
2017-08-28 14:43:44 -07:00
} else {
2017-10-31 15:24:11 -07:00
log . Info ( "removing peer due to ConfChangeRemoveNode" , "raft id" , raftId )
2017-03-23 11:41:02 -07:00
2017-08-28 14:43:44 -07:00
forceSnapshot = true
2017-03-23 11:41:02 -07:00
2017-08-28 14:43:44 -07:00
if raftId == pm . raftId {
exitAfterApplying = true
}
2017-03-23 11:41:02 -07:00
2017-08-28 14:43:44 -07:00
pm . removePeer ( raftId )
2017-03-23 11:41:02 -07:00
}
case raftpb . ConfChangeUpdateNode :
2017-08-28 14:43:44 -07:00
// NOTE: remember to forceSnapshot in this case, if we add support
// for this.
2017-10-31 15:24:11 -07:00
fatalf ( "not yet handled: ConfChangeUpdateNode" )
2017-03-23 11:41:02 -07:00
}
2017-08-28 14:43:44 -07:00
if forceSnapshot {
// We force a snapshot here to persist our updated confState, so we
// know our fellow cluster members when we come back online.
//
// It is critical here to snapshot *before* writing our applied
// index in LevelDB, otherwise a crash while/before snapshotting
// (after advancing our applied index) would result in the loss of a
// cluster member upon restart: we would re-mount with an old
// ConfState.
pm . triggerSnapshot ( entry . Index )
}
2017-03-23 11:41:02 -07:00
}
pm . advanceAppliedIndex ( entry . Index )
}
2017-08-28 14:43:44 -07:00
pm . maybeTriggerSnapshot ( )
if exitAfterApplying {
2017-10-31 15:24:11 -07:00
log . Warn ( "permanently removing self from the cluster" )
2017-08-28 14:43:44 -07:00
pm . Stop ( )
2017-10-31 15:24:11 -07:00
log . Warn ( "permanently exited the cluster" )
2017-08-28 14:43:44 -07:00
return
}
2017-03-23 11:41:02 -07:00
// 4: Call Node.Advance() to signal readiness for the next batch of
// updates.
2017-08-28 14:43:44 -07:00
pm . rawNode ( ) . Advance ( )
2017-03-23 11:41:02 -07:00
case <- pm . quitSync :
return
}
}
}
2017-08-28 14:43:44 -07:00
func ( pm * ProtocolManager ) makeInitialRaftPeers ( ) ( raftPeers [ ] etcdRaft . Peer , peerAddresses [ ] * Address , localAddress * Address ) {
initialNodes := pm . bootstrapNodes
raftPeers = make ( [ ] etcdRaft . Peer , len ( initialNodes ) ) // Entire cluster
peerAddresses = make ( [ ] * Address , len ( initialNodes ) - 1 ) // Cluster without *this* node
peersSeen := 0
for i , node := range initialNodes {
raftId := uint16 ( i + 1 )
// We initially get the raftPort from the enode ID's query string. As an alternative, we can move away from
// requiring the use of static peers for the initial set, and load them from e.g. another JSON file which
// contains pairs of enodes and raft ports, or we can get this initial peer list from commandline flags.
address := newAddress ( raftId , node . RaftPort , node )
raftPeers [ i ] = etcdRaft . Peer {
ID : uint64 ( raftId ) ,
Context : address . toBytes ( ) ,
2017-03-23 11:41:02 -07:00
}
2017-08-28 14:43:44 -07:00
if raftId == pm . raftId {
localAddress = address
} else {
peerAddresses [ peersSeen ] = address
peersSeen += 1
}
2017-03-23 11:41:02 -07:00
}
2017-08-28 14:43:44 -07:00
return
2017-03-23 11:41:02 -07:00
}
func sleep ( duration time . Duration ) {
<- time . NewTimer ( duration ) . C
}
func blockExtendsChain ( block * types . Block , chain * core . BlockChain ) bool {
return block . ParentHash ( ) == chain . CurrentBlock ( ) . Hash ( )
}
func ( pm * ProtocolManager ) applyNewChainHead ( block * types . Block ) {
if ! blockExtendsChain ( block , pm . blockchain ) {
headBlock := pm . blockchain . CurrentBlock ( )
2017-10-31 15:24:11 -07:00
log . Info ( "Non-extending block" , "block" , block . Hash ( ) , "parent" , block . ParentHash ( ) , "head" , headBlock . Hash ( ) )
2017-03-23 11:41:02 -07:00
2017-10-31 15:24:11 -07:00
pm . minter . invalidRaftOrderingChan <- InvalidRaftOrdering { headBlock : headBlock , invalidBlock : block }
2017-03-23 11:41:02 -07:00
} else {
if existingBlock := pm . blockchain . GetBlockByHash ( block . Hash ( ) ) ; nil == existingBlock {
2017-10-31 15:24:11 -07:00
if err := pm . blockchain . Validator ( ) . ValidateBody ( block ) ; err != nil {
2017-03-23 11:41:02 -07:00
panic ( fmt . Sprintf ( "failed to validate block %x (%v)" , block . Hash ( ) , err ) )
}
}
for _ , tx := range block . Transactions ( ) {
2017-10-31 15:24:11 -07:00
log . EmitCheckpoint ( log . TxAccepted , "tx" , tx . Hash ( ) . Hex ( ) )
2017-03-23 11:41:02 -07:00
}
2017-03-27 07:31:41 -07:00
_ , err := pm . blockchain . InsertChain ( [ ] * types . Block { block } )
2017-03-23 11:41:02 -07:00
2017-03-27 07:31:41 -07:00
if err != nil {
panic ( fmt . Sprintf ( "failed to extend chain: %s" , err . Error ( ) ) )
2017-03-23 11:41:02 -07:00
}
2017-10-31 15:24:11 -07:00
log . EmitCheckpoint ( log . BlockCreated , "block" , fmt . Sprintf ( "%x" , block . Hash ( ) ) )
2017-03-23 11:41:02 -07:00
}
}
// Sets new appliedIndex in-memory, *and* writes this appliedIndex to LevelDB.
func ( pm * ProtocolManager ) advanceAppliedIndex ( index uint64 ) {
pm . writeAppliedIndex ( index )
2017-08-28 14:43:44 -07:00
pm . mu . Lock ( )
pm . appliedIndex = index
pm . mu . Unlock ( )
2017-03-23 11:41:02 -07:00
}
2018-07-11 21:44:32 -07:00
func ( pm * ProtocolManager ) updateLeader ( leader uint64 ) {
pm . mu . Lock ( )
defer pm . mu . Unlock ( )
pm . leader = uint16 ( leader )
}
// The Address for the current leader, or an error if no leader is elected.
func ( pm * ProtocolManager ) LeaderAddress ( ) ( * Address , error ) {
pm . mu . RLock ( )
defer pm . mu . RUnlock ( )
if minterRole == pm . role {
return pm . address , nil
} else if l , ok := pm . peers [ pm . leader ] ; ok {
return l . address , nil
}
// We expect to reach this if pm.leader is 0, which is how etcd denotes the lack of a leader.
return nil , errors . New ( "no leader is currently elected" )
}
2018-09-18 23:55:41 -07:00
// Returns the raft id for a given enodeId
func ( pm * ProtocolManager ) FetchRaftId ( enodeId string ) ( uint16 , error ) {
node , err := discover . ParseNode ( enodeId )
if err != nil {
return 0 , err
}
for raftId , peer := range pm . peers {
if peer . p2pNode . ID == node . ID {
return raftId , nil
}
}
return 0 , fmt . Errorf ( "node not found in the cluster: %v" , enodeId )
}