DNS support for static nodes (#885)

* Update raft to accept a hostname instead of only an IP address
* Add a switch to enable DNS changes in raft to ensure default behavior is backwards-compatible with older versions.
* Allow P2P layer to use the FQDN
This commit is contained in:
Peter Fox 2019-11-29 14:42:02 +00:00 committed by Samer Falah
parent 2c7fc6c3e9
commit e3089e30be
15 changed files with 300 additions and 128 deletions

View File

@ -217,6 +217,7 @@ func RegisterRaftService(stack *node.Node, ctx *cli.Context, cfg gethConfig, eth
blockTimeMillis := ctx.GlobalInt(utils.RaftBlockTimeFlag.Name)
datadir := ctx.GlobalString(utils.DataDirFlag.Name)
joinExistingId := ctx.GlobalInt(utils.RaftJoinExistingFlag.Name)
useDns := ctx.GlobalBool(utils.RaftDNSEnabledFlag.Name)
raftPort := uint16(ctx.GlobalInt(utils.RaftPortFlag.Name))
@ -255,7 +256,7 @@ func RegisterRaftService(stack *node.Node, ctx *cli.Context, cfg gethConfig, eth
}
ethereum := <-ethChan
return raft.New(ctx, ethereum.ChainConfig(), myId, raftPort, joinExisting, blockTimeNanos, ethereum, peers, datadir)
return raft.New(ctx, ethereum.ChainConfig(), myId, raftPort, joinExisting, blockTimeNanos, ethereum, peers, datadir, useDns)
}); err != nil {
utils.Fatalf("Failed to register the Raft service: %v", err)
}

View File

@ -135,14 +135,17 @@ var (
utils.EWASMInterpreterFlag,
utils.EVMInterpreterFlag,
configFileFlag,
// Quorum
utils.EnableNodePermissionFlag,
utils.RaftModeFlag,
utils.RaftBlockTimeFlag,
utils.RaftJoinExistingFlag,
utils.RaftPortFlag,
utils.RaftDNSEnabledFlag,
utils.EmitCheckpointsFlag,
utils.IstanbulRequestTimeoutFlag,
utils.IstanbulBlockPeriodFlag,
// End-Quorum
}
rpcFlags = []cli.Flag{

View File

@ -161,6 +161,7 @@ var AppHelpFlagGroups = []flagGroup{
utils.RaftBlockTimeFlag,
utils.RaftJoinExistingFlag,
utils.RaftPortFlag,
utils.RaftDNSEnabledFlag,
},
},
{

View File

@ -603,6 +603,10 @@ var (
Usage: "The port to bind for the raft transport",
Value: 50400,
}
RaftDNSEnabledFlag = cli.BoolFlag{
Name: "raftdnsenable",
Usage: "Enable DNS resolution of peers",
}
// Quorum
EnableNodePermissionFlag = cli.BoolFlag{

23
docs/Features/dns.md Normal file
View File

@ -0,0 +1,23 @@
# DNS for Quorum
DNS support in Quorum has two distinct areas, usage in the static nodes file and usage in the
node discovery protocol. You are free to use one and not the other, or to mix them as the use case
requires.
## Static nodes
Static nodes are nodes we keep reference to even if the node is not alive, so that is the nodes comes alive,
then we can connect to it. Hostnames are permitted here, and are resolved once at startup. If a static peer goes offline
and its IP address changes, then it is expected that that peer would re-establish the connection in a fully static
network, or have discovery enabled.
## Discovery
DNS is not supported for the discovery protocol. Use a bootnode instead, which can use a DNS name that is repeatedly
resolved.
## Compatibility
For Raft, the whole network must be on version 2.3.1 of Quorum for DNS to function properly; because of this, DNS must
be explicitly enabled using the `--raftdnsenable` flag.
The network will support older nodes mixed with newer nodes if DNS is not enabled via this flag, and it is safe to
enable DNS only on some nodes if all nodes are on at least version 2.3.1. This allows for a clear upgrade path.

View File

@ -75,7 +75,9 @@ nav:
- Overview: Cakeshop/Overview.md
- Getting Started: Cakeshop/Getting started.md
- Cakeshop FAQ: Cakeshop/Cakeshop FAQ.md
- Product Roadmap: roadmap.md
- Quorum Features:
- DNS: Features/dns.md
- Product Roadmap: roadmap.md
- FAQ: FAQ.md
theme:

View File

@ -311,6 +311,7 @@ func TestUDP_findnodeMultiReply(t *testing.T) {
rpclist := make([]rpcNode, len(list))
for i := range list {
rpclist[i] = nodeToRPC(list[i])
list[i] = wrapNode(enode.NewV4(list[i].Pubkey(), list[i].IP(), list[i].TCP(), list[i].UDP(), 0))
}
test.packetIn(nil, neighborsPacket, &neighbors{Expiration: futureExp, Nodes: rpclist[:2]})
test.packetIn(nil, neighborsPacket, &neighbors{Expiration: futureExp, Nodes: rpclist[2:]})

View File

@ -21,6 +21,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/log"
"math/bits"
"math/rand"
"net"
@ -70,11 +71,36 @@ func (n *Node) Load(k enr.Entry) error {
// IP returns the IP address of the node.
func (n *Node) IP() net.IP {
// QUORUM
// no host is set, so use the IP directly
if n.Host() == "" {
return n.loadIP()
}
// attempt to look up IP addresses if host is a FQDN
lookupIPs, err := net.LookupIP(n.Host())
if err != nil {
log.Debug("hostname couldn't resolve, using IP instead", "hostname", n.Host(), "err", err.Error())
return n.loadIP()
}
// set to first ip by default
return lookupIPs[0]
// END QUORUM
}
func (n *Node) loadIP() net.IP {
var ip net.IP
n.Load((*enr.IP)(&ip))
return ip
}
// Quorum
func (n *Node) Host() string {
var hostname string
n.Load((*enr.Hostname)(&hostname))
return hostname
}
// End-Quorum
// UDP returns the UDP port of the node.
func (n *Node) UDP() int {
var port enr.UDP

View File

@ -84,6 +84,12 @@ func NewV4(pubkey *ecdsa.PublicKey, ip net.IP, tcp, udp, raftPort int) *Node {
if ip != nil {
r.Set(enr.IP(ip))
}
return newV4(pubkey, r, tcp, udp, raftPort)
}
// broken out from `func NewV4` (above) same in upstream go-ethereum, but taken out
// to avoid code duplication b/t NewV4 and NewV4Hostname
func newV4(pubkey *ecdsa.PublicKey, r enr.Record, tcp, udp, raftPort int) *Node {
if udp != 0 {
r.Set(enr.UDP(udp))
}
@ -91,7 +97,7 @@ func NewV4(pubkey *ecdsa.PublicKey, ip net.IP, tcp, udp, raftPort int) *Node {
r.Set(enr.TCP(tcp))
}
if raftPort != 0 {
if raftPort != 0 { // Quorum
r.Set(enr.RaftPort(raftPort))
}
@ -103,10 +109,24 @@ func NewV4(pubkey *ecdsa.PublicKey, ip net.IP, tcp, udp, raftPort int) *Node {
return n
}
// Quorum
// NewV4Hostname creates a node from discovery v4 node information. The record
// contained in the node has a zero-length signature. It sets the hostname of
// the node instead of the IP address.
func NewV4Hostname(pubkey *ecdsa.PublicKey, hostname string, tcp, udp, raftPort int) *Node {
var r enr.Record
if hostname != "" {
r.Set(enr.Hostname(hostname))
}
return newV4(pubkey, r, tcp, udp, raftPort)
}
// End-Quorum
func parseComplete(rawurl string) (*Node, error) {
var (
id *ecdsa.PublicKey
ip net.IP
tcpPort, udpPort uint64
)
u, err := url.Parse(rawurl)
@ -123,20 +143,8 @@ func parseComplete(rawurl string) (*Node, error) {
if id, err = parsePubkey(u.User.String()); err != nil {
return nil, fmt.Errorf("invalid node ID (%v)", err)
}
// Parse the IP address.
host, port, err := net.SplitHostPort(u.Host)
if err != nil {
return nil, fmt.Errorf("invalid host: %v", err)
}
if ip = net.ParseIP(host); ip == nil {
return nil, errors.New("invalid IP address")
}
// Ensure the IP is 4 bytes long for IPv4 addresses.
if ipv4 := ip.To4(); ipv4 != nil {
ip = ipv4
}
// Parse the port numbers.
if tcpPort, err = strconv.ParseUint(port, 10, 16); err != nil {
if tcpPort, err = strconv.ParseUint(u.Port(), 10, 16); err != nil {
return nil, errors.New("invalid port")
}
udpPort = tcpPort
@ -150,17 +158,19 @@ func parseComplete(rawurl string) (*Node, error) {
var node *Node
// Quorum
if qv.Get("raftport") != "" {
raftPort, err := strconv.ParseUint(qv.Get("raftport"), 10, 16)
if err != nil {
return nil, errors.New("invalid raftport in query")
}
node = NewV4(id, ip, int(tcpPort), int(udpPort), int(raftPort))
node = NewV4Hostname(id, u.Hostname(), int(tcpPort), int(udpPort), int(raftPort))
} else {
node = NewV4(id, ip, int(tcpPort), int(udpPort), 0)
node = NewV4Hostname(id, u.Hostname(), int(tcpPort), int(udpPort), 0)
}
return node, nil
// End-Quorum
return node, nil
}
func HexPubkey(h string) (*ecdsa.PublicKey, error) {

View File

@ -20,7 +20,6 @@ import (
"bytes"
"crypto/ecdsa"
"math/big"
"net"
"reflect"
"strings"
"testing"
@ -41,10 +40,6 @@ var parseNodeTests = []struct {
wantError: `invalid node ID (wrong length, want 128 hex chars)`,
},
// Complete nodes with IP address.
{
rawurl: "enode://1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439@hostname:3",
wantError: `invalid IP address`,
},
{
rawurl: "enode://1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439@127.0.0.1:foo",
wantError: `invalid port`,
@ -55,9 +50,9 @@ var parseNodeTests = []struct {
},
{
rawurl: "enode://1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439@127.0.0.1:52150",
wantResult: NewV4(
wantResult: NewV4Hostname(
hexPubkey("1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
net.IP{0x7f, 0x0, 0x0, 0x1},
"127.0.0.1",
52150,
52150,
0,
@ -65,9 +60,9 @@ var parseNodeTests = []struct {
},
{
rawurl: "enode://1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439@[::]:52150",
wantResult: NewV4(
wantResult: NewV4Hostname(
hexPubkey("1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
net.ParseIP("::"),
"::",
52150,
52150,
0,
@ -75,9 +70,9 @@ var parseNodeTests = []struct {
},
{
rawurl: "enode://1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439@[2001:db8:3c4d:15::abcd:ef12]:52150",
wantResult: NewV4(
wantResult: NewV4Hostname(
hexPubkey("1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
net.ParseIP("2001:db8:3c4d:15::abcd:ef12"),
"2001:db8:3c4d:15::abcd:ef12",
52150,
52150,
0,
@ -85,9 +80,9 @@ var parseNodeTests = []struct {
},
{
rawurl: "enode://1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439@127.0.0.1:52150?discport=22334",
wantResult: NewV4(
wantResult: NewV4Hostname(
hexPubkey("1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
net.IP{0x7f, 0x0, 0x0, 0x1},
"127.0.0.1",
52150,
22334,
0,
@ -133,7 +128,20 @@ func hexPubkey(h string) *ecdsa.PublicKey {
}
func TestParseNode(t *testing.T) {
for _, test := range parseNodeTests {
extraTests := []struct {
rawurl string
wantError string
wantResult *Node
}{
{
rawurl: "enode://1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439@hostname:3",
wantResult: NewV4Hostname(hexPubkey("1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), "hostname", 3, 3, 0, ),
},
}
testNodes := append(parseNodeTests, extraTests...)
for _, test := range testNodes {
n, err := ParseV4(test.rawurl)
if test.wantError != "" {
if err == nil {

View File

@ -82,6 +82,10 @@ type RaftPort uint16
func (v RaftPort) ENRKey() string { return "raftport" }
type Hostname string
func (v Hostname) ENRKey() string { return "hostname" }
// EncodeRLP implements rlp.Encoder.
func (v IP) EncodeRLP(w io.Writer) error {
if ip4 := net.IP(v).To4(); ip4 != nil {

View File

@ -39,7 +39,7 @@ type RaftService struct {
calcGasLimitFunc func(block *types.Block) uint64
}
func New(ctx *node.ServiceContext, chainConfig *params.ChainConfig, raftId, raftPort uint16, joinExisting bool, blockTime time.Duration, e *eth.Ethereum, startPeers []*enode.Node, datadir string) (*RaftService, error) {
func New(ctx *node.ServiceContext, chainConfig *params.ChainConfig, raftId, raftPort uint16, joinExisting bool, blockTime time.Duration, e *eth.Ethereum, startPeers []*enode.Node, datadir string, useDns bool) (*RaftService, error) {
service := &RaftService{
eventMux: ctx.EventMux,
chainDb: e.ChainDb(),
@ -55,7 +55,7 @@ func New(ctx *node.ServiceContext, chainConfig *params.ChainConfig, raftId, raft
service.minter = newMinter(chainConfig, service, blockTime)
var err error
if service.raftProtocolManager, err = NewProtocolManager(raftId, raftPort, service.blockchain, service.eventMux, startPeers, joinExisting, datadir, service.minter, service.downloader); err != nil {
if service.raftProtocolManager, err = NewProtocolManager(raftId, raftPort, service.blockchain, service.eventMux, startPeers, joinExisting, datadir, service.minter, service.downloader, useDns); err != nil {
return nil, err
}

View File

@ -3,6 +3,7 @@ package raft
import (
"errors"
"fmt"
"net"
"net/http"
"net/url"
"os"
@ -59,6 +60,7 @@ type ProtocolManager struct {
// P2P transport
p2pServer *p2p.Server // Initialized in start()
useDns bool
// Blockchain services
blockchain *core.BlockChain
@ -97,7 +99,7 @@ type ProtocolManager struct {
// Public interface
//
func NewProtocolManager(raftId uint16, raftPort uint16, blockchain *core.BlockChain, mux *event.TypeMux, bootstrapNodes []*enode.Node, joinExisting bool, datadir string, minter *minter, downloader *downloader.Downloader) (*ProtocolManager, error) {
func NewProtocolManager(raftId uint16, raftPort uint16, blockchain *core.BlockChain, mux *event.TypeMux, bootstrapNodes []*enode.Node, joinExisting bool, datadir string, minter *minter, downloader *downloader.Downloader, useDns bool) (*ProtocolManager, error) {
waldir := fmt.Sprintf("%s/raft-wal", datadir)
snapdir := fmt.Sprintf("%s/raft-snap", datadir)
quorumRaftDbLoc := fmt.Sprintf("%s/quorum-raft-state", datadir)
@ -123,6 +125,7 @@ func NewProtocolManager(raftId uint16, raftPort uint16, blockchain *core.BlockCh
raftStorage: etcdRaft.NewMemoryStorage(),
minter: minter,
downloader: downloader,
useDns: useDns,
}
if db, err := openQuorumRaftDb(quorumRaftDbLoc); err != nil {
@ -303,12 +306,15 @@ func (pm *ProtocolManager) isNodeAlreadyInCluster(node *enode.Node) error {
}
func (pm *ProtocolManager) ProposeNewPeer(enodeId string) (uint16, error) {
parsedUrl, _ := url.Parse(enodeId)
node, err := enode.ParseV4(enodeId)
if err != nil {
return 0, err
}
if len(node.IP()) != 4 {
//use the hostname instead of the IP, since if DNS is not enabled, the hostname should *be* the IP
ip := net.ParseIP(parsedUrl.Hostname())
if !pm.useDns && (len(ip.To4()) != 4) {
return 0, fmt.Errorf("expected IPv4 address (with length 4), but got IP of length %v", len(node.IP()))
}
@ -321,12 +327,12 @@ func (pm *ProtocolManager) ProposeNewPeer(enodeId string) (uint16, error) {
}
raftId := pm.nextRaftId()
address := newAddress(raftId, node.RaftPort(), node)
address := newAddress(raftId, node.RaftPort(), node, pm.useDns)
pm.confChangeProposalC <- raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: uint64(raftId),
Context: address.toBytes(),
Context: address.toBytes(pm.useDns),
}
return raftId, nil
@ -509,7 +515,7 @@ func (pm *ProtocolManager) setLocalAddress(addr *Address) {
// By setting `URLs` on the raft transport, we advertise our URL (in an HTTP
// header) to any recipient. This is necessary for a newcomer to the cluster
// to be able to accept a snapshot from us to bootstrap them.
if urls, err := raftTypes.NewURLs([]string{raftUrl(addr)}); err == nil {
if urls, err := raftTypes.NewURLs([]string{pm.raftUrl(addr)}); err == nil {
pm.transport.URLs = urls
} else {
panic(fmt.Sprintf("error: could not create URL from local address: %v", addr))
@ -638,8 +644,21 @@ func (pm *ProtocolManager) entriesToApply(allEntries []raftpb.Entry) (entriesToA
return
}
func raftUrl(address *Address) string {
return fmt.Sprintf("http://%s:%d", address.Ip, address.RaftPort)
func (pm *ProtocolManager) raftUrl(address *Address) string {
if !pm.useDns {
parsedIp := net.ParseIP(address.Hostname)
return fmt.Sprintf("http://%s:%d", parsedIp.To4(), address.RaftPort)
}
if parsedIp := net.ParseIP(address.Hostname); parsedIp != nil {
if ipv4 := parsedIp.To4(); ipv4 != nil {
//this is an IPv4 address
return fmt.Sprintf("http://%s:%d", ipv4, address.RaftPort)
}
//this is an IPv6 address
return fmt.Sprintf("http://[%s]:%d", parsedIp, address.RaftPort)
}
return fmt.Sprintf("http://%s:%d", address.Hostname, address.RaftPort)
}
func (pm *ProtocolManager) addPeer(address *Address) {
@ -656,11 +675,11 @@ func (pm *ProtocolManager) addPeer(address *Address) {
}
// Add P2P connection:
p2pNode := enode.NewV4(pubKey, address.Ip, int(address.P2pPort), 0, int(address.RaftPort))
p2pNode := enode.NewV4Hostname(pubKey, address.Hostname, int(address.P2pPort), 0, int(address.RaftPort))
pm.p2pServer.AddPeer(p2pNode)
// Add raft transport connection:
pm.transport.AddPeer(raftTypes.ID(raftId), []string{raftUrl(address)})
pm.transport.AddPeer(raftTypes.ID(raftId), []string{pm.raftUrl(address)})
pm.peers[raftId] = &Peer{address, p2pNode}
}
@ -848,10 +867,10 @@ func (pm *ProtocolManager) makeInitialRaftPeers() (raftPeers []etcdRaft.Peer, pe
// We initially get the raftPort from the enode ID's query string. As an alternative, we can move away from
// requiring the use of static peers for the initial set, and load them from e.g. another JSON file which
// contains pairs of enodes and raft ports, or we can get this initial peer list from commandline flags.
address := newAddress(raftId, node.RaftPort(), node)
address := newAddress(raftId, node.RaftPort(), node, pm.useDns)
raftPeers[i] = etcdRaft.Peer{
ID: uint64(raftId),
Context: address.toBytes(),
Context: address.toBytes(pm.useDns),
}
if raftId == pm.raftId {

View File

@ -1,12 +1,12 @@
package raft
import (
"io"
"net"
"bytes"
"fmt"
"log"
"net"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/rlp"
@ -18,23 +18,39 @@ import (
type Address struct {
RaftId uint16 `json:"raftId"`
NodeId enode.EnodeID `json:"nodeId"`
Ip net.IP `json:"ip"`
Ip net.IP `json:"-"`
P2pPort enr.TCP `json:"p2pPort"`
RaftPort enr.RaftPort `json:"raftPort"`
Hostname string `json:"hostname"`
// Ignore additional fields (for forward compatibility).
Rest []rlp.RawValue `json:"-" rlp:"tail"`
}
func newAddress(raftId uint16, raftPort int, node *enode.Node) *Address {
func newAddress(raftId uint16, raftPort int, node *enode.Node, withHostname bool) *Address {
// derive 64 byte nodeID from 128 byte enodeID
id, err := enode.RaftHexID(node.EnodeID())
if err != nil {
panic(err)
}
if withHostname {
return &Address{
RaftId: raftId,
NodeId: id,
Ip: nil,
P2pPort: enr.TCP(node.TCP()),
RaftPort: enr.RaftPort(raftPort),
Hostname: node.Host(),
}
}
return &Address{
RaftId: raftId,
NodeId: id,
Ip: node.IP(),
Ip: nil,
P2pPort: enr.TCP(node.TCP()),
RaftPort: enr.RaftPort(raftPort),
Hostname: node.IP().String(),
}
}
@ -44,12 +60,32 @@ type Peer struct {
p2pNode *enode.Node // For ethereum transport
}
func (addr *Address) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, []interface{}{addr.RaftId, addr.NodeId, addr.Ip, addr.P2pPort, addr.RaftPort})
// RLP Address encoding, for transport over raft and storage in LevelDB.
func (addr *Address) toBytes(withHostname bool) []byte {
var toEncode interface{}
if withHostname {
toEncode = addr
} else {
toEncode = []interface{}{addr.RaftId, addr.NodeId, net.ParseIP(addr.Hostname), addr.P2pPort, addr.RaftPort}
}
buffer, err := rlp.EncodeToBytes(toEncode)
if err != nil {
panic(fmt.Sprintf("error: failed to RLP-encode Address: %s", err.Error()))
}
return buffer
}
func (addr *Address) DecodeRLP(s *rlp.Stream) error {
// These fields need to be public:
func bytesToAddress(input []byte) *Address {
//try the new format first
addr := new(Address)
streamNew := rlp.NewStream(bytes.NewReader(input), 0)
if err := streamNew.Decode(addr); err == nil {
return addr
}
//else try the old format
var temp struct {
RaftId uint16
NodeId enode.EnodeID
@ -58,31 +94,17 @@ func (addr *Address) DecodeRLP(s *rlp.Stream) error {
RaftPort enr.RaftPort
}
if err := s.Decode(&temp); err != nil {
return err
} else {
addr.RaftId, addr.NodeId, addr.Ip, addr.P2pPort, addr.RaftPort = temp.RaftId, temp.NodeId, temp.Ip, temp.P2pPort, temp.RaftPort
return nil
}
}
// RLP Address encoding, for transport over raft and storage in LevelDB.
func (addr *Address) toBytes() []byte {
size, r, err := rlp.EncodeToReader(addr)
if err != nil {
panic(fmt.Sprintf("error: failed to RLP-encode Address: %s", err.Error()))
}
var buffer = make([]byte, uint32(size))
r.Read(buffer)
return buffer
}
func bytesToAddress(bytes []byte) *Address {
var addr Address
if err := rlp.DecodeBytes(bytes, &addr); err != nil {
streamOld := rlp.NewStream(bytes.NewReader(input), 0)
if err := streamOld.Decode(&temp); err != nil {
log.Fatalf("failed to RLP-decode Address: %v", err)
}
return &addr
return &Address{
RaftId: temp.RaftId,
NodeId: temp.NodeId,
Ip: nil,
P2pPort: temp.P2pPort,
RaftPort: temp.RaftPort,
Hostname: temp.Ip.String(),
}
}

View File

@ -1,16 +1,21 @@
package raft
import (
"bytes"
"fmt"
"io"
"math/big"
"net"
"sort"
"time"
mapset "github.com/deckarep/golang-set"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/wal/walpb"
"github.com/deckarep/golang-set"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/downloader"
@ -18,12 +23,24 @@ import (
"github.com/ethereum/go-ethereum/rlp"
)
// Snapshot
type SnapshotWithHostnames struct {
Addresses []Address
RemovedRaftIds []uint16
HeadBlockHash common.Hash
}
type Snapshot struct {
addresses []Address
removedRaftIds []uint16 // Raft IDs for permanently removed peers
headBlockHash common.Hash
type AddressWithoutHostname struct {
RaftId uint16
NodeId enode.EnodeID
Ip net.IP
P2pPort enr.TCP
RaftPort enr.RaftPort
}
type SnapshotWithoutHostnames struct {
Addresses []AddressWithoutHostname
RemovedRaftIds []uint16 // Raft IDs for permanently removed peers
HeadBlockHash common.Hash
}
type ByRaftId []Address
@ -32,17 +49,17 @@ func (a ByRaftId) Len() int { return len(a) }
func (a ByRaftId) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByRaftId) Less(i, j int) bool { return a[i].RaftId < a[j].RaftId }
func (pm *ProtocolManager) buildSnapshot() *Snapshot {
func (pm *ProtocolManager) buildSnapshot() *SnapshotWithHostnames {
pm.mu.RLock()
defer pm.mu.RUnlock()
numNodes := len(pm.confState.Nodes)
numRemovedNodes := pm.removedPeers.Cardinality()
snapshot := &Snapshot{
addresses: make([]Address, numNodes),
removedRaftIds: make([]uint16, numRemovedNodes),
headBlockHash: pm.blockchain.CurrentBlock().Hash(),
snapshot := &SnapshotWithHostnames{
Addresses: make([]Address, numNodes),
RemovedRaftIds: make([]uint16, numRemovedNodes),
HeadBlockHash: pm.blockchain.CurrentBlock().Hash(),
}
// Populate addresses
@ -51,17 +68,17 @@ func (pm *ProtocolManager) buildSnapshot() *Snapshot {
raftId := uint16(rawRaftId)
if raftId == pm.raftId {
snapshot.addresses[i] = *pm.address
snapshot.Addresses[i] = *pm.address
} else {
snapshot.addresses[i] = *pm.peers[raftId].address
snapshot.Addresses[i] = *pm.peers[raftId].address
}
}
sort.Sort(ByRaftId(snapshot.addresses))
sort.Sort(ByRaftId(snapshot.Addresses))
// Populate removed IDs
i := 0
for removedIface := range pm.removedPeers.Iterator().C {
snapshot.removedRaftIds[i] = removedIface.(uint16)
snapshot.RemovedRaftIds[i] = removedIface.(uint16)
i++
}
@ -81,7 +98,7 @@ func (pm *ProtocolManager) triggerSnapshot(index uint64) {
//snapData := pm.blockchain.CurrentBlock().Hash().Bytes()
//snap, err := pm.raftStorage.CreateSnapshot(pm.appliedIndex, &pm.confState, snapData)
snapData := pm.buildSnapshot().toBytes()
snapData := pm.buildSnapshot().toBytes(pm.useDns)
snap, err := pm.raftStorage.CreateSnapshot(index, &pm.confState, snapData)
if err != nil {
panic(err)
@ -191,43 +208,74 @@ func (pm *ProtocolManager) loadSnapshot() *raftpb.Snapshot {
}
}
func (snapshot *Snapshot) toBytes() []byte {
size, r, err := rlp.EncodeToReader(snapshot)
func (snapshot *SnapshotWithHostnames) toBytes(useDns bool) []byte {
// we have DNS enabled, so only use the new snapshot type
if useDns {
buffer, err := rlp.EncodeToBytes(snapshot)
if err != nil {
panic(fmt.Sprintf("error: failed to RLP-encode Snapshot: %s", err.Error()))
}
return buffer
}
// DNS is not enabled, use the old snapshot type, converting from hostnames to IP addresses
oldSnapshot := new(SnapshotWithoutHostnames)
oldSnapshot.HeadBlockHash, oldSnapshot.RemovedRaftIds = snapshot.HeadBlockHash, snapshot.RemovedRaftIds
oldSnapshot.Addresses = make([]AddressWithoutHostname, len(snapshot.Addresses))
for index, addrWithHost := range snapshot.Addresses {
oldSnapshot.Addresses[index] = AddressWithoutHostname{
addrWithHost.RaftId,
addrWithHost.NodeId,
net.ParseIP(addrWithHost.Hostname),
addrWithHost.P2pPort,
addrWithHost.RaftPort,
}
}
buffer, err := rlp.EncodeToBytes(oldSnapshot)
if err != nil {
panic(fmt.Sprintf("error: failed to RLP-encode Snapshot: %s", err.Error()))
}
var buffer = make([]byte, uint32(size))
r.Read(buffer)
return buffer
}
func bytesToSnapshot(bytes []byte) *Snapshot {
var snapshot Snapshot
if err := rlp.DecodeBytes(bytes, &snapshot); err != nil {
fatalf("failed to RLP-decode Snapshot: %v", err)
func bytesToSnapshot(input []byte) *SnapshotWithHostnames {
var err, errOld error
snapshot := new(SnapshotWithHostnames)
streamNewSnapshot := rlp.NewStream(bytes.NewReader(input), 0)
if err = streamNewSnapshot.Decode(snapshot); err == nil {
return snapshot
}
return &snapshot
snapshotOld := new(SnapshotWithoutHostnames)
streamOldSnapshot := rlp.NewStream(bytes.NewReader(input), 0)
if errOld = streamOldSnapshot.Decode(snapshotOld); errOld == nil {
var snapshotConverted SnapshotWithHostnames
snapshotConverted.RemovedRaftIds, snapshotConverted.HeadBlockHash = snapshotOld.RemovedRaftIds, snapshotOld.HeadBlockHash
snapshotConverted.Addresses = make([]Address, len(snapshotOld.Addresses))
for index, oldAddrWithIp := range snapshotOld.Addresses {
snapshotConverted.Addresses[index] = Address{
RaftId: oldAddrWithIp.RaftId,
NodeId: oldAddrWithIp.NodeId,
Ip: nil,
P2pPort: oldAddrWithIp.P2pPort,
RaftPort: oldAddrWithIp.RaftPort,
Hostname: oldAddrWithIp.Ip.String(),
}
}
return &snapshotConverted
}
fatalf("failed to RLP-decode Snapshot: %v, %v", err, errOld)
return nil
}
func (snapshot *Snapshot) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, []interface{}{snapshot.addresses, snapshot.removedRaftIds, snapshot.headBlockHash})
}
func (snapshot *Snapshot) DecodeRLP(s *rlp.Stream) error {
// These fields need to be public:
var temp struct {
Addresses []Address
RemovedRaftIds []uint16
HeadBlockHash common.Hash
}
if err := s.Decode(&temp); err != nil {
return err
} else {
snapshot.addresses, snapshot.removedRaftIds, snapshot.headBlockHash = temp.Addresses, temp.RemovedRaftIds, temp.HeadBlockHash
return nil
}
func (snapshot *SnapshotWithHostnames) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, []interface{}{snapshot.Addresses, snapshot.RemovedRaftIds, snapshot.HeadBlockHash})
}
// Raft snapshot
@ -265,9 +313,9 @@ func (pm *ProtocolManager) applyRaftSnapshot(raftSnapshot raftpb.Snapshot) {
}
snapshot := bytesToSnapshot(raftSnapshot.Data)
latestBlockHash := snapshot.headBlockHash
latestBlockHash := snapshot.HeadBlockHash
pm.updateClusterMembership(raftSnapshot.Metadata.ConfState, snapshot.addresses, snapshot.removedRaftIds)
pm.updateClusterMembership(raftSnapshot.Metadata.ConfState, snapshot.Addresses, snapshot.RemovedRaftIds)
preSyncHead := pm.blockchain.CurrentBlock()