
382 lines
11 KiB
Raw Normal View History

package raft
import (
2017-08-28 14:43:44 -07:00
mapset "github.com/deckarep/golang-set"
2017-08-28 14:43:44 -07:00
2017-08-28 14:43:44 -07:00
type SnapshotWithHostnames struct {
Addresses []Address
RemovedRaftIds []uint16
HeadBlockHash common.Hash
type AddressWithoutHostname struct {
RaftId uint16
NodeId enode.EnodeID
Ip net.IP
P2pPort enr.TCP
RaftPort enr.RaftPort
type SnapshotWithoutHostnames struct {
Addresses []AddressWithoutHostname
RemovedRaftIds []uint16 // Raft IDs for permanently removed peers
HeadBlockHash common.Hash
2017-08-28 14:43:44 -07:00
type ByRaftId []Address
func (a ByRaftId) Len() int { return len(a) }
func (a ByRaftId) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByRaftId) Less(i, j int) bool { return a[i].RaftId < a[j].RaftId }
2017-08-28 14:43:44 -07:00
func (pm *ProtocolManager) buildSnapshot() *SnapshotWithHostnames {
2017-08-28 14:43:44 -07:00
defer pm.mu.RUnlock()
numNodes := len(pm.confState.Nodes) + len(pm.confState.Learners)
numRemovedNodes := pm.removedPeers.Cardinality()
2017-08-28 14:43:44 -07:00
snapshot := &SnapshotWithHostnames{
Addresses: make([]Address, numNodes),
RemovedRaftIds: make([]uint16, numRemovedNodes),
HeadBlockHash: pm.blockchain.CurrentBlock().Hash(),
2017-08-28 14:43:44 -07:00
// Populate addresses
for i, rawRaftId := range append(pm.confState.Nodes, pm.confState.Learners...) {
2017-08-28 14:43:44 -07:00
raftId := uint16(rawRaftId)
if raftId == pm.raftId {
snapshot.Addresses[i] = *pm.address
2017-08-28 14:43:44 -07:00
} else {
snapshot.Addresses[i] = *pm.peers[raftId].address
2017-08-28 14:43:44 -07:00
2017-08-28 14:43:44 -07:00
// Populate removed IDs
i := 0
for removedIface := range pm.removedPeers.Iterator().C {
snapshot.RemovedRaftIds[i] = removedIface.(uint16)
2017-08-28 14:43:44 -07:00
return snapshot
2017-08-28 14:43:44 -07:00
// Note that we do *not* read `pm.appliedIndex` here. We only use the `index`
// parameter instead. This is because we need to support a scenario when we
// snapshot for a future index that we have not yet recorded in LevelDB. See
// comments around the use of `forceSnapshot`.
func (pm *ProtocolManager) triggerSnapshot(index uint64) {
snapshotIndex := pm.snapshotIndex
log.Info("start snapshot", "applied index", pm.appliedIndex, "last snapshot index", snapshotIndex)
2017-08-28 14:43:44 -07:00
//snapData := pm.blockchain.CurrentBlock().Hash().Bytes()
//snap, err := pm.raftStorage.CreateSnapshot(pm.appliedIndex, &pm.confState, snapData)
snapData := pm.buildSnapshot().toBytes(pm.useDns)
2017-08-28 14:43:44 -07:00
snap, err := pm.raftStorage.CreateSnapshot(index, &pm.confState, snapData)
if err != nil {
2017-08-28 14:43:44 -07:00
if err := pm.saveRaftSnapshot(snap); err != nil {
2017-08-28 14:43:44 -07:00
// Discard all log entries prior to index.
if err := pm.raftStorage.Compact(index); err != nil {
log.Info("compacted log", "index", pm.appliedIndex)
2017-08-28 14:43:44 -07:00
pm.snapshotIndex = index
func confStateIdSet(confState raftpb.ConfState) mapset.Set {
set := mapset.NewSet()
for _, rawRaftId := range append(confState.Nodes, confState.Learners...) {
2017-08-28 14:43:44 -07:00
return set
2017-08-28 14:43:44 -07:00
func (pm *ProtocolManager) updateClusterMembership(newConfState raftpb.ConfState, addresses []Address, removedRaftIds []uint16) {
log.Info("updating cluster membership per raft snapshot")
2017-08-28 14:43:44 -07:00
prevConfState := pm.confState
// Update tombstones for permanently removed peers. For simplicity we do not
// allow the re-use of peer IDs once a peer is removed.
removedPeers := mapset.NewSet()
2017-08-28 14:43:44 -07:00
for _, removedRaftId := range removedRaftIds {
pm.removedPeers = removedPeers
// Remove old peers that we're still connected to
prevIds := confStateIdSet(prevConfState)
newIds := confStateIdSet(newConfState)
idsToRemove := prevIds.Difference(newIds)
for idIfaceToRemove := range idsToRemove.Iterator().C {
2017-08-28 14:43:44 -07:00
raftId := idIfaceToRemove.(uint16)
log.Info("removing old raft peer", "peer id", raftId)
2017-08-28 14:43:44 -07:00
// Update local and remote addresses
for _, tempAddress := range addresses {
address := tempAddress // Allocate separately on the heap for each iteration.
if address.RaftId == pm.raftId {
2017-08-28 14:43:44 -07:00
// If we're a newcomer to an existing cluster, this is where we learn
// our own Address.
} else {
existingPeer := pm.peers[address.RaftId]
2017-08-28 14:43:44 -07:00
if existingPeer == nil {
log.Info("adding new raft peer", "raft id", address.RaftId)
2017-08-28 14:43:44 -07:00
pm.confState = newConfState
log.Info("updated cluster membership")
2017-08-28 14:43:44 -07:00
func (pm *ProtocolManager) maybeTriggerSnapshot() {
appliedIndex := pm.appliedIndex
entriesSinceLastSnap := appliedIndex - pm.snapshotIndex
if entriesSinceLastSnap < snapshotPeriod {
func (pm *ProtocolManager) loadSnapshot() *raftpb.Snapshot {
2017-08-28 14:43:44 -07:00
if raftSnapshot := pm.readRaftSnapshot(); raftSnapshot != nil {
log.Info("loading snapshot")
2017-08-28 14:43:44 -07:00
return raftSnapshot
} else {
log.Info("no snapshot to load")
2017-08-28 14:43:44 -07:00
return nil
func (snapshot *SnapshotWithHostnames) toBytes(useDns bool) []byte {
// we have DNS enabled, so only use the new snapshot type
if useDns {
buffer, err := rlp.EncodeToBytes(snapshot)
if err != nil {
panic(fmt.Sprintf("error: failed to RLP-encode Snapshot: %s", err.Error()))
return buffer
// DNS is not enabled, use the old snapshot type, converting from hostnames to IP addresses
oldSnapshot := new(SnapshotWithoutHostnames)
oldSnapshot.HeadBlockHash, oldSnapshot.RemovedRaftIds = snapshot.HeadBlockHash, snapshot.RemovedRaftIds
oldSnapshot.Addresses = make([]AddressWithoutHostname, len(snapshot.Addresses))
for index, addrWithHost := range snapshot.Addresses {
oldSnapshot.Addresses[index] = AddressWithoutHostname{
buffer, err := rlp.EncodeToBytes(oldSnapshot)
2017-08-28 14:43:44 -07:00
if err != nil {
panic(fmt.Sprintf("error: failed to RLP-encode Snapshot: %s", err.Error()))
return buffer
func bytesToSnapshot(input []byte) *SnapshotWithHostnames {
var err, errOld error
snapshot := new(SnapshotWithHostnames)
streamNewSnapshot := rlp.NewStream(bytes.NewReader(input), 0)
if err = streamNewSnapshot.Decode(snapshot); err == nil {
return snapshot
2017-08-28 14:43:44 -07:00
snapshotOld := new(SnapshotWithoutHostnames)
streamOldSnapshot := rlp.NewStream(bytes.NewReader(input), 0)
if errOld = streamOldSnapshot.Decode(snapshotOld); errOld == nil {
var snapshotConverted SnapshotWithHostnames
snapshotConverted.RemovedRaftIds, snapshotConverted.HeadBlockHash = snapshotOld.RemovedRaftIds, snapshotOld.HeadBlockHash
snapshotConverted.Addresses = make([]Address, len(snapshotOld.Addresses))
for index, oldAddrWithIp := range snapshotOld.Addresses {
snapshotConverted.Addresses[index] = Address{
RaftId: oldAddrWithIp.RaftId,
NodeId: oldAddrWithIp.NodeId,
Ip: nil,
P2pPort: oldAddrWithIp.P2pPort,
RaftPort: oldAddrWithIp.RaftPort,
Hostname: oldAddrWithIp.Ip.String(),
2017-08-28 14:43:44 -07:00
return &snapshotConverted
2017-08-28 14:43:44 -07:00
fatalf("failed to RLP-decode Snapshot: %v, %v", err, errOld)
return nil
func (snapshot *SnapshotWithHostnames) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, []interface{}{snapshot.Addresses, snapshot.RemovedRaftIds, snapshot.HeadBlockHash})
2017-08-28 14:43:44 -07:00
// Raft snapshot
func (pm *ProtocolManager) saveRaftSnapshot(snap raftpb.Snapshot) error {
if err := pm.snapshotter.SaveSnap(snap); err != nil {
return err
walSnap := walpb.Snapshot{
Index: snap.Metadata.Index,
Term: snap.Metadata.Term,
if err := pm.wal.SaveSnapshot(walSnap); err != nil {
return err
return pm.wal.ReleaseLockTo(snap.Metadata.Index)
func (pm *ProtocolManager) readRaftSnapshot() *raftpb.Snapshot {
snapshot, err := pm.snapshotter.Load()
if err != nil && err != snap.ErrNoSnapshot {
fatalf("error loading snapshot: %v", err)
return snapshot
2017-08-28 14:43:44 -07:00
func (pm *ProtocolManager) applyRaftSnapshot(raftSnapshot raftpb.Snapshot) {
log.Info("applying snapshot to raft storage")
2017-08-28 14:43:44 -07:00
if err := pm.raftStorage.ApplySnapshot(raftSnapshot); err != nil {
fatalf("failed to apply snapshot: %s", err)
2017-08-28 14:43:44 -07:00
snapshot := bytesToSnapshot(raftSnapshot.Data)
latestBlockHash := snapshot.HeadBlockHash
2017-08-28 14:43:44 -07:00
pm.updateClusterMembership(raftSnapshot.Metadata.ConfState, snapshot.Addresses, snapshot.RemovedRaftIds)
2017-08-28 14:43:44 -07:00
preSyncHead := pm.blockchain.CurrentBlock()
2017-08-28 14:43:44 -07:00
if latestBlock := pm.blockchain.GetBlockByHash(latestBlockHash); latestBlock == nil {
log.Info(chainExtensionMessage, "hash", pm.blockchain.CurrentBlock().Hash())
2017-08-28 14:43:44 -07:00
} else {
// added for permissions changes to indicate node sync up has started
log.Info("blockchain is caught up; no need to synchronize")
2017-08-28 14:43:44 -07:00
2017-08-28 14:43:44 -07:00
snapMeta := raftSnapshot.Metadata
pm.confState = snapMeta.ConfState
2017-08-28 14:43:44 -07:00
pm.snapshotIndex = snapMeta.Index
2017-08-28 14:43:44 -07:00
func (pm *ProtocolManager) syncBlockchainUntil(hash common.Hash) {
peerMap := make(map[uint16]*Peer, len(pm.peers))
for raftId, peer := range pm.peers {
peerMap[raftId] = peer
for {
for peerId, peer := range peerMap {
log.Info("synchronizing with peer", "peer id", peerId, "hash", hash)
2017-08-28 14:43:44 -07:00
peerId := peer.p2pNode.ID().String()
peerIdPrefix := fmt.Sprintf("%x", peer.p2pNode.ID().Bytes()[:8])
2017-08-28 14:43:44 -07:00
if err := pm.downloader.Synchronise(peerIdPrefix, hash, big.NewInt(0), downloader.BoundedFullSync); err != nil {
log.Info("failed to synchronize with peer", "peer id", peerId)
2017-08-28 14:43:44 -07:00
time.Sleep(500 * time.Millisecond)
} else {
func (pm *ProtocolManager) logNewlyAcceptedTransactions(preSyncHead *types.Block) {
newHead := pm.blockchain.CurrentBlock()
numBlocks := newHead.NumberU64() - preSyncHead.NumberU64()
blocks := make([]*types.Block, numBlocks)
currBlock := newHead
blocksSeen := 0
for currBlock.Hash() != preSyncHead.Hash() {
blocks[int(numBlocks)-(1+blocksSeen)] = currBlock
blocksSeen += 1
currBlock = pm.blockchain.GetBlockByHash(currBlock.ParentHash())
for _, block := range blocks {
for _, tx := range block.Transactions() {
2018-02-01 14:18:12 -08:00
log.EmitCheckpoint(log.TxAccepted, "tx", tx.Hash().Hex())
2017-08-28 14:43:44 -07:00