Switched to a pure go networking stack

This commit is contained in:
StephenButtolph 2020-05-17 23:47:43 -04:00
parent 7c4abdea66
commit b4306585a4
24 changed files with 1685 additions and 2480 deletions

View File

@ -226,3 +226,17 @@ func (service *Admin) AliasChain(_ *http.Request, args *AliasChainArgs, reply *A
reply.Success = true
return service.httpServer.AddAliasesWithReadLock("bc/"+chainID.String(), "bc/"+args.Alias)
}
// StacktraceArgs are the arguments for calling Stacktrace
type StacktraceArgs struct{}
// StacktraceReply are the results from calling Stacktrace
type StacktraceReply struct {
Stacktrace string `json:"stacktrace"`
}
// Stacktrace returns the current global stacktrace
func (service *Admin) Stacktrace(_ *http.Request, _ *StacktraceArgs, reply *StacktraceReply) error {
reply.Stacktrace = logging.Stacktrace{Global: true}.String()
return nil
}

View File

@ -3,9 +3,44 @@
package chains
import "github.com/ava-labs/gecko/snow/networking"
import (
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow"
"github.com/ava-labs/gecko/snow/engine/common"
"github.com/ava-labs/gecko/snow/validators"
"github.com/ava-labs/gecko/utils/math"
)
// Awaiter can await connections to be connected
type Awaiter interface {
AwaitConnections(awaiting *networking.AwaitingConnections)
type awaiter struct {
vdrs validators.Set
reqWeight uint64
weight uint64
ctx *snow.Context
eng common.Engine
}
func (a *awaiter) Connected(vdrID ids.ShortID) bool {
vdr, ok := a.vdrs.Get(vdrID)
if !ok {
return false
}
weight, err := math.Add64(vdr.Weight(), a.weight)
a.weight = weight
if err == nil && a.weight < a.reqWeight {
return false
}
go func() {
a.ctx.Lock.Lock()
defer a.ctx.Lock.Unlock()
a.eng.Startup()
}()
return true
}
func (a *awaiter) Disconnected(vdrID ids.ShortID) bool {
if vdr, ok := a.vdrs.Get(vdrID); ok {
a.weight, _ = math.Sub64(vdr.Weight(), a.weight)
}
return false
}

View File

@ -13,13 +13,13 @@ import (
"github.com/ava-labs/gecko/database"
"github.com/ava-labs/gecko/database/prefixdb"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/network"
"github.com/ava-labs/gecko/snow"
"github.com/ava-labs/gecko/snow/consensus/snowball"
"github.com/ava-labs/gecko/snow/engine/avalanche"
"github.com/ava-labs/gecko/snow/engine/avalanche/state"
"github.com/ava-labs/gecko/snow/engine/common"
"github.com/ava-labs/gecko/snow/engine/common/queue"
"github.com/ava-labs/gecko/snow/networking"
"github.com/ava-labs/gecko/snow/networking/handler"
"github.com/ava-labs/gecko/snow/networking/router"
"github.com/ava-labs/gecko/snow/networking/sender"
@ -102,16 +102,15 @@ type manager struct {
decisionEvents *triggers.EventDispatcher
consensusEvents *triggers.EventDispatcher
db database.Database
chainRouter router.Router // Routes incoming messages to the appropriate chain
sender sender.ExternalSender // Sends consensus messages to other validators
timeoutManager *timeout.Manager // Manages request timeouts when sending messages to other validators
consensusParams avacon.Parameters // The consensus parameters (alpha, beta, etc.) for new chains
validators validators.Manager // Validators validating on this chain
registrants []Registrant // Those notified when a chain is created
nodeID ids.ShortID // The ID of this node
networkID uint32 // ID of the network this node is connected to
awaiter Awaiter // Waits for required connections before running bootstrapping
server *api.Server // Handles HTTP API calls
chainRouter router.Router // Routes incoming messages to the appropriate chain
net network.Network // Sends consensus messages to other validators
timeoutManager *timeout.Manager // Manages request timeouts when sending messages to other validators
consensusParams avacon.Parameters // The consensus parameters (alpha, beta, etc.) for new chains
validators validators.Manager // Validators validating on this chain
registrants []Registrant // Those notified when a chain is created
nodeID ids.ShortID // The ID of this node
networkID uint32 // ID of the network this node is connected to
server *api.Server // Handles HTTP API calls
keystore *keystore.Keystore
sharedMemory *atomic.SharedMemory
@ -133,12 +132,11 @@ func New(
consensusEvents *triggers.EventDispatcher,
db database.Database,
router router.Router,
sender sender.ExternalSender,
net network.Network,
consensusParams avacon.Parameters,
validators validators.Manager,
nodeID ids.ShortID,
networkID uint32,
awaiter Awaiter,
server *api.Server,
keystore *keystore.Keystore,
sharedMemory *atomic.SharedMemory,
@ -158,13 +156,12 @@ func New(
consensusEvents: consensusEvents,
db: db,
chainRouter: router,
sender: sender,
net: net,
timeoutManager: &timeoutManager,
consensusParams: consensusParams,
validators: validators,
nodeID: nodeID,
networkID: networkID,
awaiter: awaiter,
server: server,
keystore: keystore,
sharedMemory: sharedMemory,
@ -390,7 +387,7 @@ func (m *manager) createAvalancheChain(
// Passes messages from the consensus engine to the network
sender := sender.Sender{}
sender.Initialize(ctx, m.sender, m.chainRouter, m.timeoutManager)
sender.Initialize(ctx, m.net, m.chainRouter, m.timeoutManager)
// The engine handles consensus
engine := avaeng.Transitive{
@ -438,17 +435,17 @@ func (m *manager) createAvalancheChain(
m.chainRouter.AddChain(handler)
go ctx.Log.RecoverAndPanic(handler.Dispatch)
awaiting := &networking.AwaitingConnections{
Requested: beacons,
WeightRequired: (3*bootstrapWeight + 3) / 4, // 75% must be connected to
Finish: func() {
ctx.Lock.Lock()
defer ctx.Lock.Unlock()
engine.Startup()
},
reqWeight := (3*bootstrapWeight + 3) / 4
if reqWeight == 0 {
engine.Startup()
} else {
go m.net.RegisterHandler(&awaiter{
vdrs: beacons,
reqWeight: reqWeight, // 75% must be connected to
ctx: ctx,
eng: &engine,
})
}
m.awaiter.AwaitConnections(awaiting)
return nil
}
@ -486,7 +483,7 @@ func (m *manager) createSnowmanChain(
// Passes messages from the consensus engine to the network
sender := sender.Sender{}
sender.Initialize(ctx, m.sender, m.chainRouter, m.timeoutManager)
sender.Initialize(ctx, m.net, m.chainRouter, m.timeoutManager)
bootstrapWeight := uint64(0)
for _, beacon := range beacons.List() {
@ -524,17 +521,17 @@ func (m *manager) createSnowmanChain(
m.chainRouter.AddChain(handler)
go ctx.Log.RecoverAndPanic(handler.Dispatch)
awaiting := &networking.AwaitingConnections{
Requested: beacons,
WeightRequired: (3*bootstrapWeight + 3) / 4, // 75% must be connected to
Finish: func() {
ctx.Lock.Lock()
defer ctx.Lock.Unlock()
engine.Startup()
},
reqWeight := (3*bootstrapWeight + 3) / 4
if reqWeight == 0 {
engine.Startup()
} else {
go m.net.RegisterHandler(&awaiter{
vdrs: beacons,
reqWeight: reqWeight, // 75% must be connected to
ctx: ctx,
eng: &engine,
})
}
m.awaiter.AwaitConnections(awaiting)
return nil
}

View File

@ -78,12 +78,6 @@ func main() {
return
}
log.Debug("Starting servers")
if err := node.MainNode.StartConsensusServer(); err != nil {
log.Fatal("problem starting servers: %s", err)
return
}
defer node.MainNode.Shutdown()
log.Debug("Dispatching node handlers")

View File

@ -1,7 +1,7 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package networking
package network
import (
"github.com/ava-labs/gecko/ids"

View File

@ -1,15 +1,13 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package networking
package network
import (
"errors"
"fmt"
"math"
"github.com/ava-labs/salticidae-go"
"github.com/ava-labs/gecko/utils"
"github.com/ava-labs/gecko/utils/wrappers"
)
@ -23,15 +21,14 @@ var (
type Codec struct{}
// Pack attempts to pack a map of fields into a message.
//
// If a nil error is returned, the message's datastream must be freed manually
func (Codec) Pack(op salticidae.Opcode, fields map[Field]interface{}) (Msg, error) {
func (Codec) Pack(op uint8, fields map[Field]interface{}) (Msg, error) {
message, ok := Messages[op]
if !ok {
return nil, errBadOp
}
p := wrappers.Packer{MaxSize: math.MaxInt32}
p.PackByte(op)
for _, field := range message {
data, ok := fields[field]
if !ok {
@ -40,43 +37,34 @@ func (Codec) Pack(op salticidae.Opcode, fields map[Field]interface{}) (Msg, erro
field.Packer()(&p, data)
}
if p.Errored() { // Prevent the datastream from leaking
return nil, p.Err
}
return &msg{
op: op,
ds: salticidae.NewDataStreamFromBytes(p.Bytes, false),
fields: fields,
}, nil
bytes: p.Bytes,
}, p.Err
}
// Parse attempts to convert a byte stream into a message.
//
// The datastream is not freed.
func (Codec) Parse(op salticidae.Opcode, ds salticidae.DataStream) (Msg, error) {
// Parse attempts to convert bytes into a message.
func (Codec) Parse(b []byte) (Msg, error) {
p := wrappers.Packer{Bytes: b}
op := p.UnpackByte()
message, ok := Messages[op]
if !ok {
return nil, errBadOp
}
size := ds.Size()
byteHandle := ds.GetDataInPlace(size)
p := wrappers.Packer{Bytes: utils.CopyBytes(byteHandle.Get())}
byteHandle.Release()
fields := make(map[Field]interface{}, len(message))
for _, field := range message {
fields[field] = field.Unpacker()(&p)
}
if p.Offset != size {
return nil, errBadLength
if p.Offset != len(b) {
p.Add(fmt.Errorf("expected length %d got %d", len(b), p.Offset))
}
return &msg{
op: op,
ds: ds,
fields: fields,
bytes: b,
}, p.Err
}

View File

@ -1,11 +1,9 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package networking
package network
import (
"github.com/ava-labs/salticidae-go"
"github.com/ava-labs/gecko/utils/wrappers"
)
@ -138,7 +136,7 @@ func (f Field) String() string {
// Public commands that may be sent between stakers
const (
// Handshake:
GetVersion salticidae.Opcode = iota
GetVersion uint8 = iota
Version
GetPeerList
PeerList
@ -165,7 +163,7 @@ const (
// Defines the messages that can be sent/received with this network
var (
Messages = map[salticidae.Opcode][]Field{
Messages = map[uint8][]Field{
// Handshake:
GetVersion: []Field{},
Version: []Field{NetworkID, MyTime, IP, VersionStr},

26
network/dialer.go Normal file
View File

@ -0,0 +1,26 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package network
import (
"net"
"github.com/ava-labs/gecko/utils"
)
// Dialer ...
type Dialer interface {
Dial(utils.IPDesc) (net.Conn, error)
}
type dialer struct {
network string
}
// NewDialer ...
func NewDialer(network string) Dialer { return &dialer{network: network} }
func (d *dialer) Dial(ip utils.IPDesc) (net.Conn, error) {
return net.Dial(d.network, ip.String())
}

14
network/handler.go Normal file
View File

@ -0,0 +1,14 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package network
import "github.com/ava-labs/gecko/ids"
// Handler represents a handler that is called when a connection is marked as
// connected and disconnected
type Handler interface {
// returns true if the handler should be removed
Connected(id ids.ShortID) bool
Disconnected(id ids.ShortID) bool
}

View File

@ -1,30 +1,26 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package networking
import (
"github.com/ava-labs/salticidae-go"
)
package network
// Msg represents a set of fields that can be serialized into a byte stream
type Msg interface {
Op() salticidae.Opcode
Op() uint8
Get(Field) interface{}
DataStream() salticidae.DataStream
Bytes() []byte
}
type msg struct {
op salticidae.Opcode
ds salticidae.DataStream
op uint8
fields map[Field]interface{}
bytes []byte
}
// Field returns the value of the specified field in this message
func (msg *msg) Op() salticidae.Opcode { return msg.op }
func (msg *msg) Op() uint8 { return msg.op }
// Field returns the value of the specified field in this message
func (msg *msg) Get(field Field) interface{} { return msg.fields[field] }
// Bytes returns this message in bytes
func (msg *msg) DataStream() salticidae.DataStream { return msg.ds }
func (msg *msg) Bytes() []byte { return msg.bytes }

700
network/network.go Normal file
View File

@ -0,0 +1,700 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package network
import (
"fmt"
"math"
"net"
"sync"
"time"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow/networking/router"
"github.com/ava-labs/gecko/snow/networking/sender"
"github.com/ava-labs/gecko/snow/validators"
"github.com/ava-labs/gecko/utils"
"github.com/ava-labs/gecko/utils/logging"
"github.com/ava-labs/gecko/utils/random"
"github.com/ava-labs/gecko/utils/timer"
"github.com/ava-labs/gecko/version"
)
const (
defaultInitialReconnectDelay = time.Second
defaultMaxReconnectDelay = time.Hour
defaultMaxMessageSize uint32 = 1 << 21
defaultSendQueueSize = 1 << 10
defaultMaxClockDifference = time.Minute
defaultPeerListGossipSpacing = time.Minute
defaultPeerListGossipSize = 100
defaultPeerListStakerGossipFraction = 2
defaultGetVersionTimeout = 2 * time.Second
defaultAllowPrivateIPs = true
defaultGossipSize = 50
)
// Network ...
type Network interface {
sender.ExternalSender
// Returns non-nil error
Dispatch() error
// Must be thread safe. Will never stop attempting to connect to this IP
Track(ip utils.IPDesc)
// Calls Connected on all the currently connected peers
RegisterHandler(h Handler)
IPs() []utils.IPDesc
// Must be thread safe
Close() error
}
type network struct {
log logging.Logger
id ids.ShortID
ip utils.IPDesc
networkID uint32
version version.Version
parser version.Parser
listener net.Listener
dialer Dialer
serverUpgrader Upgrader
clientUpgrader Upgrader
vdrs validators.Set // set of current validators in the AVAnet
router router.Router
clock timer.Clock
initialReconnectDelay time.Duration
maxReconnectDelay time.Duration
maxMessageSize uint32
sendQueueSize int
maxClockDifference time.Duration
peerListGossipSpacing time.Duration
peerListGossipSize int
peerListStakerGossipFraction int
getVersionTimeout time.Duration
allowPrivateIPs bool
gossipSize int
executor timer.Executor
b Builder
stateLock sync.Mutex
closed bool
disconnectedIPs map[string]struct{}
connectedIPs map[string]struct{}
peers map[[20]byte]*peer
handlers []Handler
}
// NewDefaultNetwork ...
func NewDefaultNetwork(
log logging.Logger,
id ids.ShortID,
ip utils.IPDesc,
networkID uint32,
version version.Version,
parser version.Parser,
listener net.Listener,
dialer Dialer,
serverUpgrader,
clientUpgrader Upgrader,
vdrs validators.Set,
router router.Router,
) Network {
return NewNetwork(
log,
id,
ip,
networkID,
version,
parser,
listener,
dialer,
serverUpgrader,
clientUpgrader,
vdrs,
router,
defaultInitialReconnectDelay,
defaultMaxReconnectDelay,
defaultMaxMessageSize,
defaultSendQueueSize,
defaultMaxClockDifference,
defaultPeerListGossipSpacing,
defaultPeerListGossipSize,
defaultPeerListStakerGossipFraction,
defaultGetVersionTimeout,
defaultAllowPrivateIPs,
defaultGossipSize,
)
}
// NewNetwork ...
func NewNetwork(
log logging.Logger,
id ids.ShortID,
ip utils.IPDesc,
networkID uint32,
version version.Version,
parser version.Parser,
listener net.Listener,
dialer Dialer,
serverUpgrader,
clientUpgrader Upgrader,
vdrs validators.Set,
router router.Router,
initialReconnectDelay,
maxReconnectDelay time.Duration,
maxMessageSize uint32,
sendQueueSize int,
maxClockDifference time.Duration,
peerListGossipSpacing time.Duration,
peerListGossipSize int,
peerListStakerGossipFraction int,
getVersionTimeout time.Duration,
allowPrivateIPs bool,
gossipSize int,
) Network {
net := &network{
log: log,
id: id,
ip: ip,
networkID: networkID,
version: version,
parser: parser,
listener: listener,
dialer: dialer,
serverUpgrader: serverUpgrader,
clientUpgrader: clientUpgrader,
vdrs: vdrs,
router: router,
initialReconnectDelay: initialReconnectDelay,
maxReconnectDelay: maxReconnectDelay,
maxMessageSize: maxMessageSize,
sendQueueSize: sendQueueSize,
maxClockDifference: maxClockDifference,
peerListGossipSpacing: peerListGossipSpacing,
peerListGossipSize: peerListGossipSize,
peerListStakerGossipFraction: peerListStakerGossipFraction,
getVersionTimeout: getVersionTimeout,
allowPrivateIPs: allowPrivateIPs,
gossipSize: gossipSize,
disconnectedIPs: make(map[string]struct{}),
connectedIPs: make(map[string]struct{}),
peers: make(map[[20]byte]*peer),
}
net.executor.Initialize()
return net
}
func (n *network) Dispatch() error {
go n.gossip()
for {
conn, err := n.listener.Accept()
if err != nil {
return err
}
go n.upgrade(&peer{
net: n,
conn: conn,
}, n.serverUpgrader)
}
}
func (n *network) RegisterHandler(h Handler) {
n.stateLock.Lock()
defer n.stateLock.Unlock()
if h.Connected(n.id) {
return
}
for _, peer := range n.peers {
if peer.connected {
if h.Connected(peer.id) {
return
}
}
}
n.handlers = append(n.handlers, h)
}
func (n *network) IPs() []utils.IPDesc {
n.stateLock.Lock()
defer n.stateLock.Unlock()
ips := []utils.IPDesc(nil)
for _, peer := range n.peers {
if peer.connected {
ips = append(ips, peer.ip)
}
}
return ips
}
func (n *network) gossip() {
t := time.NewTicker(n.peerListGossipSpacing)
defer t.Stop()
for range t.C {
ips := n.validatorIPs()
if len(ips) == 0 {
n.log.Debug("skipping validator gossiping as no validators are connected")
continue
}
msg, err := n.b.PeerList(ips)
if err != nil {
n.log.Warn("failed to gossip PeerList message due to %s", err)
continue
}
n.stateLock.Lock()
if n.closed {
n.stateLock.Unlock()
return
}
stakers := []*peer(nil)
nonStakers := []*peer(nil)
for _, peer := range n.peers {
if n.vdrs.Contains(peer.id) {
stakers = append(stakers, peer)
} else {
nonStakers = append(nonStakers, peer)
}
}
numStakersToSend := (n.peerListGossipSize + n.peerListStakerGossipFraction - 1) / n.peerListStakerGossipFraction
if len(stakers) < numStakersToSend {
numStakersToSend = len(stakers)
}
numNonStakersToSend := n.peerListGossipSize - numStakersToSend
if len(nonStakers) < numNonStakersToSend {
numNonStakersToSend = len(nonStakers)
}
sampler := random.Uniform{N: len(stakers)}
for i := 0; i < numStakersToSend; i++ {
stakers[sampler.Sample()].Send(msg)
}
sampler.N = len(nonStakers)
sampler.Replace()
for i := 0; i < numNonStakersToSend; i++ {
nonStakers[sampler.Sample()].Send(msg)
}
n.stateLock.Unlock()
}
}
func (n *network) Close() error {
n.stateLock.Lock()
if n.closed {
n.stateLock.Unlock()
return nil
}
n.closed = true
err := n.listener.Close()
n.stateLock.Unlock()
for _, peer := range n.peers {
peer.Close() // Grabs the stateLock
}
return err
}
func (n *network) Track(ip utils.IPDesc) {
n.stateLock.Lock()
defer n.stateLock.Unlock()
n.track(ip)
}
func (n *network) track(ip utils.IPDesc) {
if n.closed {
return
}
str := ip.String()
if _, ok := n.disconnectedIPs[str]; ok {
return
}
if _, ok := n.connectedIPs[str]; ok {
return
}
n.disconnectedIPs[str] = struct{}{}
go n.connectTo(ip)
}
func (n *network) connectTo(ip utils.IPDesc) {
str := ip.String()
delay := n.initialReconnectDelay
for {
n.stateLock.Lock()
_, isDisconnected := n.disconnectedIPs[str]
_, isConnected := n.connectedIPs[str]
n.stateLock.Unlock()
if !isDisconnected || isConnected {
// If the IP was discovered by that peer connecting to us, we don't
// need to attempt to connect anymore
return
}
err := n.attemptConnect(ip)
if err == nil {
return
}
// TODO: Log error
time.Sleep(delay)
delay *= 2
if delay > n.maxReconnectDelay {
delay = n.maxReconnectDelay
}
}
}
func (n *network) attemptConnect(ip utils.IPDesc) error {
n.log.Verbo("attempting to connect to %s", ip)
conn, err := n.dialer.Dial(ip)
if err != nil {
return err
}
return n.upgrade(&peer{
net: n,
ip: ip,
conn: conn,
}, n.clientUpgrader)
}
func (n *network) upgrade(p *peer, upgrader Upgrader) error {
id, conn, err := upgrader.Upgrade(p.conn)
if err != nil {
n.log.Verbo("failed to upgrade connection with %s", err)
return err
}
p.sender = make(chan []byte, n.sendQueueSize)
p.id = id
p.conn = conn
key := id.Key()
n.stateLock.Lock()
defer n.stateLock.Unlock()
if n.closed {
return nil
}
_, ok := n.peers[key]
if ok {
p.conn.Close()
return nil
}
n.peers[key] = p
p.Start()
return nil
}
func (n *network) validatorIPs() []utils.IPDesc {
n.stateLock.Lock()
defer n.stateLock.Unlock()
ips := []utils.IPDesc(nil)
for _, peer := range n.peers {
if peer.connected &&
!peer.ip.IsZero() &&
n.vdrs.Contains(peer.id) {
ips = append(ips, peer.ip)
}
}
return ips
}
// stateLock is held when called
func (n *network) connected(p *peer) {
n.log.Debug("connected to %s at %s", p.id, p.ip)
if !p.ip.IsZero() {
str := p.ip.String()
delete(n.disconnectedIPs, str)
n.connectedIPs[str] = struct{}{}
}
for i := 0; i < len(n.handlers); {
if n.handlers[i].Connected(p.id) {
n.handlers[i] = n.handlers[len(n.handlers)-1] // remove the current handler
} else {
i++
}
}
}
// stateLock is held when called
func (n *network) disconnected(p *peer) {
n.log.Debug("disconnected from %s at %s", p.id, p.ip)
key := p.id.Key()
delete(p.net.peers, key)
if !p.ip.IsZero() {
str := p.ip.String()
delete(n.disconnectedIPs, str)
delete(n.connectedIPs, str)
p.net.track(p.ip)
}
if p.connected {
for i := 0; i < len(n.handlers); {
if n.handlers[i].Disconnected(p.id) {
n.handlers[i] = n.handlers[len(n.handlers)-1] // remove the current handler
} else {
i++
}
}
}
}
// Accept is called after every consensus decision
func (n *network) Accept(chainID, containerID ids.ID, container []byte) error {
return n.gossipContainer(chainID, containerID, container)
}
// GetAcceptedFrontier implements the Sender interface.
func (n *network) GetAcceptedFrontier(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32) {
msg, err := n.b.GetAcceptedFrontier(chainID, requestID)
n.log.AssertNoError(err)
n.stateLock.Lock()
defer n.stateLock.Unlock()
for _, validatorID := range validatorIDs.List() {
vID := validatorID
peer, sent := n.peers[vID.Key()]
if sent {
sent = peer.Send(msg)
}
if !sent {
n.executor.Add(func() { n.router.GetAcceptedFrontierFailed(vID, chainID, requestID) })
}
}
}
// AcceptedFrontier implements the Sender interface.
func (n *network) AcceptedFrontier(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
msg, err := n.b.AcceptedFrontier(chainID, requestID, containerIDs)
if err != nil {
n.log.Error("attempted to pack too large of an AcceptedFrontier message.\nNumber of containerIDs: %d",
containerIDs.Len())
return // Packing message failed
}
n.stateLock.Lock()
defer n.stateLock.Unlock()
peer, sent := n.peers[validatorID.Key()]
if sent {
sent = peer.Send(msg)
}
if !sent {
n.log.Debug("failed to send an AcceptedFrontier message to: %s", validatorID)
}
}
// GetAccepted implements the Sender interface.
func (n *network) GetAccepted(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
msg, err := n.b.GetAccepted(chainID, requestID, containerIDs)
if err != nil {
for _, validatorID := range validatorIDs.List() {
vID := validatorID
n.executor.Add(func() { n.router.GetAcceptedFailed(vID, chainID, requestID) })
}
return
}
n.stateLock.Lock()
defer n.stateLock.Unlock()
for _, validatorID := range validatorIDs.List() {
vID := validatorID
peer, sent := n.peers[vID.Key()]
if sent {
sent = peer.Send(msg)
}
if !sent {
n.executor.Add(func() { n.router.GetAcceptedFailed(vID, chainID, requestID) })
}
}
}
// Accepted implements the Sender interface.
func (n *network) Accepted(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
msg, err := n.b.Accepted(chainID, requestID, containerIDs)
if err != nil {
n.log.Error("attempted to pack too large of an Accepted message.\nNumber of containerIDs: %d",
containerIDs.Len())
return // Packing message failed
}
n.stateLock.Lock()
defer n.stateLock.Unlock()
peer, sent := n.peers[validatorID.Key()]
if sent {
sent = peer.Send(msg)
}
if !sent {
n.log.Debug("failed to send an Accepted message to: %s", validatorID)
}
}
// Get implements the Sender interface.
func (n *network) Get(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID) {
msg, err := n.b.Get(chainID, requestID, containerID)
n.log.AssertNoError(err)
n.stateLock.Lock()
defer n.stateLock.Unlock()
peer, sent := n.peers[validatorID.Key()]
if sent {
sent = peer.Send(msg)
}
if !sent {
n.log.Debug("failed to send a Get message to: %s", validatorID)
}
}
// Put implements the Sender interface.
func (n *network) Put(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) {
msg, err := n.b.Put(chainID, requestID, containerID, container)
if err != nil {
n.log.Error("failed to build Put message because of container of size %d", len(container))
return
}
n.stateLock.Lock()
defer n.stateLock.Unlock()
peer, sent := n.peers[validatorID.Key()]
if sent {
sent = peer.Send(msg)
}
if !sent {
n.log.Debug("failed to send a Put message to: %s", validatorID)
}
}
// PushQuery implements the Sender interface.
func (n *network) PushQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) {
msg, err := n.b.PushQuery(chainID, requestID, containerID, container)
if err != nil {
for _, validatorID := range validatorIDs.List() {
vID := validatorID
n.executor.Add(func() { n.router.QueryFailed(vID, chainID, requestID) })
}
n.log.Error("attempted to pack too large of a PushQuery message.\nContainer length: %d", len(container))
return // Packing message failed
}
n.stateLock.Lock()
defer n.stateLock.Unlock()
for _, validatorID := range validatorIDs.List() {
vID := validatorID
peer, sent := n.peers[vID.Key()]
if sent {
sent = peer.Send(msg)
}
if !sent {
n.log.Debug("failed sending a PushQuery message to: %s", vID)
n.executor.Add(func() { n.router.QueryFailed(vID, chainID, requestID) })
}
}
}
// PullQuery implements the Sender interface.
func (n *network) PullQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID) {
msg, err := n.b.PullQuery(chainID, requestID, containerID)
n.log.AssertNoError(err)
n.stateLock.Lock()
defer n.stateLock.Unlock()
for _, validatorID := range validatorIDs.List() {
vID := validatorID
peer, sent := n.peers[vID.Key()]
if sent {
sent = peer.Send(msg)
}
if !sent {
n.log.Debug("failed sending a PullQuery message to: %s", vID)
n.executor.Add(func() { n.router.QueryFailed(vID, chainID, requestID) })
}
}
}
// Chits implements the Sender interface.
func (n *network) Chits(validatorID ids.ShortID, chainID ids.ID, requestID uint32, votes ids.Set) {
msg, err := n.b.Chits(chainID, requestID, votes)
if err != nil {
n.log.Error("failed to build Chits message because of %d votes", votes.Len())
return
}
n.stateLock.Lock()
defer n.stateLock.Unlock()
peer, sent := n.peers[validatorID.Key()]
if sent {
sent = peer.Send(msg)
}
if !sent {
n.log.Debug("failed to send a Chits message to: %s", validatorID)
}
}
// Gossip attempts to gossip the container to the network
func (n *network) Gossip(chainID, containerID ids.ID, container []byte) {
if err := n.gossipContainer(chainID, containerID, container); err != nil {
n.log.Error("error gossiping container %s to %s: %s", containerID, chainID, err)
}
}
func (n *network) gossipContainer(chainID, containerID ids.ID, container []byte) error {
msg, err := n.b.Put(chainID, math.MaxUint32, containerID, container)
if err != nil {
return fmt.Errorf("attempted to pack too large of a Put message.\nContainer length: %d", len(container))
}
n.stateLock.Lock()
defer n.stateLock.Unlock()
allPeers := make([]*peer, 0, len(n.peers))
for _, peer := range n.peers {
allPeers = append(allPeers, peer)
}
numToGossip := n.gossipSize
if numToGossip > len(allPeers) {
numToGossip = len(allPeers)
}
sampler := random.Uniform{N: len(allPeers)}
for i := 0; i < numToGossip; i++ {
allPeers[sampler.Sample()].Send(msg)
}
return nil
}

456
network/peer.go Normal file
View File

@ -0,0 +1,456 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package network
import (
"bytes"
"math"
"net"
"sync"
"time"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/utils"
"github.com/ava-labs/gecko/utils/formatting"
"github.com/ava-labs/gecko/utils/wrappers"
)
type peer struct {
net *network
connected bool
closed bool
sender chan []byte
ip utils.IPDesc
id ids.ShortID
conn net.Conn
once sync.Once
b Builder
c Codec
}
func (p *peer) Start() {
go p.ReadMessages()
go p.WriteMessages()
// Initially send the version to the peer
p.Version()
}
func (p *peer) requestVersion() {
t := time.NewTicker(p.net.getVersionTimeout)
defer t.Stop()
for range t.C {
p.net.stateLock.Lock()
connected := p.connected
closed := p.closed
p.net.stateLock.Unlock()
if connected || closed {
return
}
p.GetVersion()
}
}
func (p *peer) ReadMessages() {
defer p.Close()
pendingBuffer := wrappers.Packer{}
readBuffer := make([]byte, 1<<10)
for {
read, err := p.conn.Read(readBuffer)
if err != nil {
p.net.log.Verbo("error on connection read to %s %s", p.id, err)
return
}
pendingBuffer.Bytes = append(pendingBuffer.Bytes, readBuffer[:read]...)
msgBytes := pendingBuffer.UnpackBytes()
if pendingBuffer.Errored() {
// if reading the bytes errored, then we haven't read the full
// message yet
pendingBuffer.Offset = 0
pendingBuffer.Err = nil
if uint32(len(pendingBuffer.Bytes)) > p.net.maxMessageSize+wrappers.IntLen {
// we have read more bytes than the max message size allows for,
// so we should terminate this connection
p.net.log.Verbo("error reading too many bytes on %s %s", p.id, err)
return
}
// we should try to read more bytes to finish the message
continue
}
// we read the full message bytes
// set the pending bytes to any extra bytes that were read
pendingBuffer.Bytes = pendingBuffer.Bytes[pendingBuffer.Offset:]
// set the offset back to the start of the next message
pendingBuffer.Offset = 0
if uint32(len(msgBytes)) > p.net.maxMessageSize {
// if this message is longer than the max message length, then we
// should terminate this connection
p.net.log.Verbo("error reading too many bytes on %s %s", p.id, err)
return
}
p.net.log.Verbo("parsing new message from %s:\n%s",
p.id,
formatting.DumpBytes{Bytes: msgBytes})
msg, err := p.c.Parse(msgBytes)
if err != nil {
p.net.log.Debug("failed to parse new message from %s:\n%s\n%s",
p.id,
formatting.DumpBytes{Bytes: msgBytes},
err)
return
}
p.handle(msg)
}
}
func (p *peer) WriteMessages() {
defer p.Close()
for msg := range p.sender {
p.net.log.Verbo("sending new message to %s:\n%s",
p.id,
formatting.DumpBytes{Bytes: msg})
packer := wrappers.Packer{Bytes: make([]byte, len(msg)+wrappers.IntLen)}
packer.PackBytes(msg)
msg = packer.Bytes
for len(msg) > 0 {
written, err := p.conn.Write(msg)
if err != nil {
// TODO: log error
return
}
msg = msg[written:]
}
}
}
func (p *peer) Send(msg Msg) bool {
select {
case p.sender <- msg.Bytes():
return true
default:
p.net.log.Debug("dropping message to %s due to a full send queue", p.id)
return false
}
}
func (p *peer) handle(msg Msg) {
op := msg.Op()
switch op {
case Version:
p.version(msg)
return
case GetVersion:
p.getVersion(msg)
return
}
if !p.connected {
p.net.log.Debug("dropping message from %s because the connection hasn't been established yet", p.id)
return
}
switch msg.Op() {
case GetPeerList:
p.getPeerList(msg)
case PeerList:
p.peerList(msg)
case GetAcceptedFrontier:
p.getAcceptedFrontier(msg)
case AcceptedFrontier:
p.acceptedFrontier(msg)
case GetAccepted:
p.getAccepted(msg)
case Accepted:
p.accepted(msg)
case Get:
p.get(msg)
case Put:
p.put(msg)
case PushQuery:
p.pushQuery(msg)
case PullQuery:
p.pullQuery(msg)
case Chits:
p.chits(msg)
default:
p.net.log.Debug("dropping an unknown message from %s with op %d", p.id, op)
}
}
func (p *peer) Close() { p.once.Do(p.close) }
func (p *peer) close() {
p.net.stateLock.Lock()
defer p.net.stateLock.Unlock()
p.closed = true
p.conn.Close()
close(p.sender)
p.net.disconnected(p)
}
func (p *peer) GetVersion() {
msg, err := p.b.GetVersion()
p.net.log.AssertNoError(err)
p.Send(msg)
}
func (p *peer) Version() {
msg, err := p.b.Version(
p.net.networkID,
p.net.clock.Unix(),
p.net.ip,
p.net.version.String(),
)
p.net.log.AssertNoError(err)
p.Send(msg)
}
func (p *peer) GetPeerList() {
msg, err := p.b.GetPeerList()
p.net.log.AssertNoError(err)
p.Send(msg)
}
func (p *peer) PeerList(peers []utils.IPDesc) {
msg, err := p.b.PeerList(peers)
if err != nil {
p.net.log.Warn("failed to send PeerList message due to %s", err)
return
}
p.Send(msg)
return
}
func (p *peer) getVersion(_ Msg) { p.Version() }
func (p *peer) version(msg Msg) {
if p.connected {
p.net.log.Verbo("dropping duplicated version message from %s", p.id)
return
}
if networkID := msg.Get(NetworkID).(uint32); networkID != p.net.networkID {
p.net.log.Debug("peer's network ID doesn't match our networkID: Peer's = %d ; Ours = %d",
networkID,
p.net.networkID)
// By clearing the IP, we will not attempt to reconnect to this peer
p.ip = utils.IPDesc{}
p.Close()
return
}
myTime := float64(p.net.clock.Unix())
if peerTime := float64(msg.Get(MyTime).(uint64)); math.Abs(peerTime-myTime) > p.net.maxClockDifference.Seconds() {
p.net.log.Debug("peer's clock is too far out of sync with mine. Peer's = %d, Ours = %d (seconds)",
uint64(peerTime),
uint64(myTime))
// By clearing the IP, we will not attempt to reconnect to this peer
p.ip = utils.IPDesc{}
p.Close()
return
}
peerVersionStr := msg.Get(VersionStr).(string)
peerVersion, err := p.net.parser.Parse(peerVersionStr)
if err != nil {
p.net.log.Debug("peer version could not be parsed due to %s", err)
// By clearing the IP, we will not attempt to reconnect to this peer
p.ip = utils.IPDesc{}
p.Close()
return
}
if p.net.version.Before(peerVersion) {
p.net.log.Info("peer attempting to connect with newer version %s. You may want to update your client",
peerVersion)
}
if err := p.net.version.Compatible(peerVersion); err != nil {
p.net.log.Debug("peer version not compatible due to %s", err)
// By clearing the IP, we will not attempt to reconnect to this peer
p.ip = utils.IPDesc{}
p.Close()
return
}
if p.ip.IsZero() {
// we only care about the claimed IP if we don't know the IP yet
peerIP := msg.Get(IP).(utils.IPDesc)
addr := p.conn.RemoteAddr()
localPeerIP, err := utils.ToIPDesc(addr.String())
if err == nil {
// If we have no clue what the peer's IP is, we can't perform any
// verification
if bytes.Equal(peerIP.IP, localPeerIP.IP) {
// if the IPs match, add this ip:port pair to be tracked
p.ip = peerIP
}
}
}
p.net.stateLock.Lock()
defer p.net.stateLock.Unlock()
if p.closed {
return
}
p.connected = true
p.net.connected(p)
}
func (p *peer) getPeerList(_ Msg) {
ips := p.net.validatorIPs()
reply, err := p.b.PeerList(ips)
if err != nil {
p.net.log.Warn("failed to send PeerList message due to %s", err)
return
}
p.Send(reply)
}
func (p *peer) peerList(msg Msg) {
ips := msg.Get(Peers).([]utils.IPDesc)
for _, ip := range ips {
if !ip.Equal(p.net.ip) &&
!ip.IsZero() &&
(p.net.allowPrivateIPs || !ip.IsPrivate()) {
// TODO: this is a vulnerability, perhaps only try to connect once?
p.net.Track(ip)
}
}
}
func (p *peer) getAcceptedFrontier(msg Msg) {
chainID, err := ids.ToID(msg.Get(ChainID).([]byte))
p.net.log.AssertNoError(err)
requestID := msg.Get(RequestID).(uint32)
p.net.router.GetAcceptedFrontier(p.id, chainID, requestID)
}
func (p *peer) acceptedFrontier(msg Msg) {
chainID, err := ids.ToID(msg.Get(ChainID).([]byte))
p.net.log.AssertNoError(err)
requestID := msg.Get(RequestID).(uint32)
containerIDs := ids.Set{}
for _, containerIDBytes := range msg.Get(ContainerIDs).([][]byte) {
containerID, err := ids.ToID(containerIDBytes)
if err != nil {
p.net.log.Debug("error parsing ContainerID 0x%x: %s", containerIDBytes, err)
return
}
containerIDs.Add(containerID)
}
p.net.router.AcceptedFrontier(p.id, chainID, requestID, containerIDs)
}
func (p *peer) getAccepted(msg Msg) {
chainID, err := ids.ToID(msg.Get(ChainID).([]byte))
p.net.log.AssertNoError(err)
requestID := msg.Get(RequestID).(uint32)
containerIDs := ids.Set{}
for _, containerIDBytes := range msg.Get(ContainerIDs).([][]byte) {
containerID, err := ids.ToID(containerIDBytes)
if err != nil {
p.net.log.Debug("error parsing ContainerID 0x%x: %s", containerIDBytes, err)
return
}
containerIDs.Add(containerID)
}
p.net.router.GetAccepted(p.id, chainID, requestID, containerIDs)
}
func (p *peer) accepted(msg Msg) {
chainID, err := ids.ToID(msg.Get(ChainID).([]byte))
p.net.log.AssertNoError(err)
requestID := msg.Get(RequestID).(uint32)
containerIDs := ids.Set{}
for _, containerIDBytes := range msg.Get(ContainerIDs).([][]byte) {
containerID, err := ids.ToID(containerIDBytes)
if err != nil {
p.net.log.Debug("error parsing ContainerID 0x%x: %s", containerIDBytes, err)
return
}
containerIDs.Add(containerID)
}
p.net.router.Accepted(p.id, chainID, requestID, containerIDs)
}
func (p *peer) get(msg Msg) {
chainID, err := ids.ToID(msg.Get(ChainID).([]byte))
p.net.log.AssertNoError(err)
requestID := msg.Get(RequestID).(uint32)
containerID, err := ids.ToID(msg.Get(ContainerID).([]byte))
p.net.log.AssertNoError(err)
p.net.router.Get(p.id, chainID, requestID, containerID)
}
func (p *peer) put(msg Msg) {
chainID, err := ids.ToID(msg.Get(ChainID).([]byte))
p.net.log.AssertNoError(err)
requestID := msg.Get(RequestID).(uint32)
containerID, err := ids.ToID(msg.Get(ContainerID).([]byte))
p.net.log.AssertNoError(err)
container := msg.Get(ContainerBytes).([]byte)
p.net.router.Put(p.id, chainID, requestID, containerID, container)
}
func (p *peer) pushQuery(msg Msg) {
chainID, err := ids.ToID(msg.Get(ChainID).([]byte))
p.net.log.AssertNoError(err)
requestID := msg.Get(RequestID).(uint32)
containerID, err := ids.ToID(msg.Get(ContainerID).([]byte))
p.net.log.AssertNoError(err)
container := msg.Get(ContainerBytes).([]byte)
p.net.router.PushQuery(p.id, chainID, requestID, containerID, container)
}
func (p *peer) pullQuery(msg Msg) {
chainID, err := ids.ToID(msg.Get(ChainID).([]byte))
p.net.log.AssertNoError(err)
requestID := msg.Get(RequestID).(uint32)
containerID, err := ids.ToID(msg.Get(ContainerID).([]byte))
p.net.log.AssertNoError(err)
p.net.router.PullQuery(p.id, chainID, requestID, containerID)
}
func (p *peer) chits(msg Msg) {
chainID, err := ids.ToID(msg.Get(ChainID).([]byte))
p.net.log.AssertNoError(err)
requestID := msg.Get(RequestID).(uint32)
containerIDs := ids.Set{}
for _, containerIDBytes := range msg.Get(ContainerIDs).([][]byte) {
containerID, err := ids.ToID(containerIDBytes)
if err != nil {
p.net.log.Debug("error parsing ContainerID 0x%x: %s", containerIDBytes, err)
return
}
containerIDs.Add(containerID)
}
p.net.router.Chits(p.id, chainID, requestID, containerIDs)
}

92
network/upgrader.go Normal file
View File

@ -0,0 +1,92 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package network
import (
"crypto/tls"
"errors"
"net"
"github.com/ava-labs/gecko/utils/hashing"
"github.com/ava-labs/gecko/ids"
)
var (
errNoCert = errors.New("tls handshake finished with no peer certificate")
)
// Upgrader ...
type Upgrader interface {
// Must be thread safe
Upgrade(net.Conn) (ids.ShortID, net.Conn, error)
}
type ipUpgrader struct{}
// NewIPUpgrader ...
func NewIPUpgrader() Upgrader { return ipUpgrader{} }
func (ipUpgrader) Upgrade(conn net.Conn) (ids.ShortID, net.Conn, error) {
addr := conn.RemoteAddr()
str := addr.String()
id := ids.NewShortID(hashing.ComputeHash160Array([]byte(str)))
return id, conn, nil
}
type tlsServerUpgrader struct {
config *tls.Config
}
// NewTLSServerUpgrader ...
func NewTLSServerUpgrader(config *tls.Config) Upgrader {
return tlsServerUpgrader{
config: config,
}
}
func (t tlsServerUpgrader) Upgrade(conn net.Conn) (ids.ShortID, net.Conn, error) {
encConn := tls.Server(conn, t.config)
if err := encConn.Handshake(); err != nil {
return ids.ShortID{}, nil, err
}
connState := encConn.ConnectionState()
if len(connState.PeerCertificates) == 0 {
return ids.ShortID{}, nil, errNoCert
}
peerCert := connState.PeerCertificates[0]
id := ids.NewShortID(
hashing.ComputeHash160Array(
hashing.ComputeHash256(peerCert.Raw)))
return id, encConn, nil
}
type tlsClientUpgrader struct {
config *tls.Config
}
// NewTLSClientUpgrader ...
func NewTLSClientUpgrader(config *tls.Config) Upgrader {
return tlsClientUpgrader{
config: config,
}
}
func (t tlsClientUpgrader) Upgrade(conn net.Conn) (ids.ShortID, net.Conn, error) {
encConn := tls.Client(conn, t.config)
if err := encConn.Handshake(); err != nil {
return ids.ShortID{}, nil, err
}
connState := encConn.ConnectionState()
if len(connState.PeerCertificates) == 0 {
return ids.ShortID{}, nil, errNoCert
}
peerCert := connState.PeerCertificates[0]
id := ids.NewShortID(
hashing.ComputeHash160Array(
hashing.ComputeHash256(peerCert.Raw)))
return id, encConn, nil
}

View File

@ -1,281 +0,0 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package networking
import (
"fmt"
"sync"
"github.com/ava-labs/salticidae-go"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/utils"
)
// Connections provides an interface for what a group of connections will
// support.
type Connections interface {
Add(salticidae.PeerID, ids.ShortID, utils.IPDesc)
GetPeerID(ids.ShortID) (salticidae.PeerID, bool)
GetID(salticidae.PeerID) (ids.ShortID, bool)
ContainsPeerID(salticidae.PeerID) bool
ContainsID(ids.ShortID) bool
ContainsIP(utils.IPDesc) bool
Remove(salticidae.PeerID, ids.ShortID)
RemovePeerID(salticidae.PeerID)
RemoveID(ids.ShortID)
PeerIDs() []salticidae.PeerID
IDs() ids.ShortSet
IPs() []utils.IPDesc
Conns() ([]salticidae.PeerID, []ids.ShortID, []utils.IPDesc)
Len() int
}
type connections struct {
mux sync.Mutex
// peerID -> id
peerIDToID map[[32]byte]ids.ShortID
// id -> peerID
idToPeerID map[[20]byte]salticidae.PeerID
// id -> ip
idToIP map[[20]byte]utils.IPDesc
}
// NewConnections returns a new and empty connections object
func NewConnections() Connections {
return &connections{
peerIDToID: make(map[[32]byte]ids.ShortID),
idToPeerID: make(map[[20]byte]salticidae.PeerID),
idToIP: make(map[[20]byte]utils.IPDesc),
}
}
// Add Assumes that peer is garbage collected normally
func (c *connections) Add(peer salticidae.PeerID, id ids.ShortID, ip utils.IPDesc) {
c.mux.Lock()
defer c.mux.Unlock()
c.add(peer, id, ip)
}
// GetPeerID returns the peer mapped to the id that is provided if one exists.
func (c *connections) GetPeerID(id ids.ShortID) (salticidae.PeerID, bool) {
c.mux.Lock()
defer c.mux.Unlock()
return c.getPeerID(id)
}
// GetID returns the id mapped to the peer that is provided if one exists.
func (c *connections) GetID(peer salticidae.PeerID) (ids.ShortID, bool) {
c.mux.Lock()
defer c.mux.Unlock()
return c.getID(peer)
}
// ContainsPeerID returns true if the peer is contained in the connection pool
func (c *connections) ContainsPeerID(peer salticidae.PeerID) bool {
_, exists := c.GetID(peer)
return exists
}
// ContainsID returns true if the id is contained in the connection pool
func (c *connections) ContainsID(id ids.ShortID) bool {
_, exists := c.GetPeerID(id)
return exists
}
// ContainsIP returns true if the ip is contained in the connection pool
func (c *connections) ContainsIP(ip utils.IPDesc) bool {
for _, otherIP := range c.IPs() {
if ip.Equal(otherIP) {
return true
}
}
return false
}
// Remove ensures that no connection will have any mapping containing [peer] or
// [id].
func (c *connections) Remove(peer salticidae.PeerID, id ids.ShortID) {
c.mux.Lock()
defer c.mux.Unlock()
c.remove(peer, id)
}
// RemovePeerID ensures that no connection will have a mapping containing [peer]
func (c *connections) RemovePeerID(peer salticidae.PeerID) {
c.mux.Lock()
defer c.mux.Unlock()
c.removePeerID(peer)
}
// RemoveID ensures that no connection will have a mapping containing [id]
func (c *connections) RemoveID(id ids.ShortID) {
c.mux.Lock()
defer c.mux.Unlock()
c.removeID(id)
}
// PeerIDs returns the full list of peers contained in this connection pool.
func (c *connections) PeerIDs() []salticidae.PeerID {
c.mux.Lock()
defer c.mux.Unlock()
return c.peerIDs()
}
// IDs return the set of IDs that are mapping in this connection pool.
func (c *connections) IDs() ids.ShortSet {
c.mux.Lock()
defer c.mux.Unlock()
return c.ids()
}
// IPs return the set of IPs that are mapped in this connection pool.
func (c *connections) IPs() []utils.IPDesc {
c.mux.Lock()
defer c.mux.Unlock()
return c.ips()
}
// Conns return the set of connections in this connection pool.
func (c *connections) Conns() ([]salticidae.PeerID, []ids.ShortID, []utils.IPDesc) {
c.mux.Lock()
defer c.mux.Unlock()
return c.conns()
}
// Len returns the number of elements in the map
func (c *connections) Len() int {
c.mux.Lock()
defer c.mux.Unlock()
return c.len()
}
func (c *connections) add(peer salticidae.PeerID, id ids.ShortID, ip utils.IPDesc) {
c.remove(peer, id)
key := id.Key()
c.peerIDToID[toID(peer)] = id
c.idToPeerID[key] = peer
c.idToIP[key] = ip
}
func (c *connections) getPeerID(id ids.ShortID) (salticidae.PeerID, bool) {
peer, exists := c.idToPeerID[id.Key()]
return peer, exists
}
func (c *connections) getID(peer salticidae.PeerID) (ids.ShortID, bool) {
id, exists := c.peerIDToID[toID(peer)]
return id, exists
}
func (c *connections) remove(peer salticidae.PeerID, id ids.ShortID) {
c.removePeerID(peer)
c.removeID(id)
}
func (c *connections) removePeerID(peer salticidae.PeerID) {
peerID := toID(peer)
if id, exists := c.peerIDToID[peerID]; exists {
idKey := id.Key()
delete(c.peerIDToID, peerID)
delete(c.idToPeerID, idKey)
delete(c.idToIP, idKey)
}
}
func (c *connections) removeID(id ids.ShortID) {
idKey := id.Key()
if peer, exists := c.idToPeerID[idKey]; exists {
delete(c.peerIDToID, toID(peer))
delete(c.idToPeerID, idKey)
delete(c.idToIP, idKey)
}
}
func (c *connections) peerIDs() []salticidae.PeerID {
peers := make([]salticidae.PeerID, 0, len(c.idToPeerID))
for _, peer := range c.idToPeerID {
peers = append(peers, peer)
}
return peers
}
func (c *connections) ids() ids.ShortSet {
ids := ids.ShortSet{}
for _, id := range c.peerIDToID {
ids.Add(id)
}
return ids
}
func (c *connections) ips() []utils.IPDesc {
ips := make([]utils.IPDesc, 0, len(c.idToIP))
for _, ip := range c.idToIP {
ips = append(ips, ip)
}
return ips
}
func (c *connections) conns() ([]salticidae.PeerID, []ids.ShortID, []utils.IPDesc) {
peers := make([]salticidae.PeerID, 0, len(c.idToPeerID))
idList := make([]ids.ShortID, 0, len(c.idToPeerID))
ips := make([]utils.IPDesc, 0, len(c.idToPeerID))
for id, peer := range c.idToPeerID {
idList = append(idList, ids.NewShortID(id))
peers = append(peers, peer)
ips = append(ips, c.idToIP[id])
}
return peers, idList, ips
}
func (c *connections) len() int { return len(c.idToPeerID) }
func toID(peer salticidae.PeerID) [32]byte {
ds := salticidae.NewDataStream(false)
peerInt := peer.AsUInt256()
peerInt.Serialize(ds)
size := ds.Size()
dsb := ds.GetDataInPlace(size)
idBytes := dsb.Get()
id := [32]byte{}
copy(id[:], idBytes)
dsb.Release()
ds.Free()
return id
}
func toIPDesc(addr salticidae.NetAddr) utils.IPDesc {
ip, err := ToIPDesc(addr)
HandshakeNet.log.AssertNoError(err)
return ip
}
// ToIPDesc converts an address to an IP
func ToIPDesc(addr salticidae.NetAddr) (utils.IPDesc, error) {
ip := salticidae.FromBigEndianU32(addr.GetIP())
port := salticidae.FromBigEndianU16(addr.GetPort())
return utils.ToIPDesc(fmt.Sprintf("%d.%d.%d.%d:%d", byte(ip>>24), byte(ip>>16), byte(ip>>8), byte(ip), port))
}

View File

@ -1,821 +0,0 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package networking
// #include "salticidae/network.h"
// bool connHandler(msgnetwork_conn_t *, bool, void *);
// void unknownPeerHandler(netaddr_t *, x509_t *, void *);
// void peerHandler(peernetwork_conn_t *, bool, void *);
// void ping(msg_t *, msgnetwork_conn_t *, void *);
// void pong(msg_t *, msgnetwork_conn_t *, void *);
// void getVersion(msg_t *, msgnetwork_conn_t *, void *);
// void version(msg_t *, msgnetwork_conn_t *, void *);
// void getPeerList(msg_t *, msgnetwork_conn_t *, void *);
// void peerList(msg_t *, msgnetwork_conn_t *, void *);
import "C"
import (
"errors"
"fmt"
"math"
"strconv"
"strings"
"sync"
"time"
"unsafe"
"github.com/prometheus/client_golang/prometheus"
"github.com/ava-labs/salticidae-go"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow/networking"
"github.com/ava-labs/gecko/snow/validators"
"github.com/ava-labs/gecko/utils"
"github.com/ava-labs/gecko/utils/hashing"
"github.com/ava-labs/gecko/utils/logging"
"github.com/ava-labs/gecko/utils/random"
"github.com/ava-labs/gecko/utils/timer"
)
/*
Receive a new connection.
- Send version message.
Receive version message.
- Validate data
- Send peer list
- Mark this node as being connected
*/
/*
Periodically gossip peerlists.
- Only connected stakers should be gossiped.
- Gossip to a caped number of peers.
- The peers to gossip to should be at least half full of stakers (or all the
stakers should be in the set).
*/
/*
Attempt reconnections
- If a non-staker disconnects, delete the connection
- If a staker disconnects, attempt to reconnect to the node for awhile. If the
node isn't connected to after awhile delete the connection.
*/
// Version this avalanche instance is executing.
var (
VersionPrefix = "avalanche/"
VersionSeparator = "."
MajorVersion = 0
MinorVersion = 2
PatchVersion = 1
ClientVersion = fmt.Sprintf("%s%d%s%d%s%d",
VersionPrefix,
MajorVersion,
VersionSeparator,
MinorVersion,
VersionSeparator,
PatchVersion)
)
const (
// MaxClockDifference allowed between connected nodes.
MaxClockDifference = time.Minute
// PeerListGossipSpacing is the amount of time to wait between pushing this
// node's peer list to other nodes.
PeerListGossipSpacing = time.Minute
// PeerListGossipSize is the number of peers to gossip each period.
PeerListGossipSize = 100
// PeerListStakerGossipFraction calculates the fraction of stakers that are
// gossiped to. If set to 1, then only stakers will be gossiped to.
PeerListStakerGossipFraction = 2
// ConnectTimeout is the amount of time to wait before attempt to connect to
// an unknown peer
ConnectTimeout = 6 * time.Second
// GetVersionTimeout is the amount of time to wait before sending a
// getVersion message to a partially connected peer
GetVersionTimeout = 2 * time.Second
// ReconnectTimeout is the amount of time to wait to reconnect to a staker
// before giving up
ReconnectTimeout = 10 * time.Minute
)
// Manager is the struct that will be accessed on event calls
var (
HandshakeNet = Handshake{}
)
var (
errDSValidators = errors.New("couldn't get validator set of default subnet")
)
// Handshake handles the authentication of new peers. Only valid stakers
// will appear connected.
type Handshake struct {
handshakeMetrics
networkID uint32 // ID of the network I'm running, used to prevent connecting to the wrong network
log logging.Logger
vdrs validators.Set // set of current validators in the AVAnet
myAddr salticidae.NetAddr // IP I communicate to peers
myID ids.ShortID // ID that identifies myself as a staker or not
net salticidae.PeerNetwork // C messaging network
enableStaking bool // Should only be false for local tests
clock timer.Clock
// Connections that I have added by IP, but haven't gotten an ID from
requestedLock sync.Mutex
requested map[string]struct{}
requestedTimeout timer.TimeoutManager // keys are hashes of the ip:port string
// Connections that I have added as a peer, but haven't gotten a version
// message from
pending Connections
versionTimeout timer.TimeoutManager // keys are the peer IDs
// Connections that I have gotten a valid version message from
connections Connections
reconnectTimeout timer.TimeoutManager // keys are the peer IDs
// IPs of nodes I'm connected to will be repeatedly gossiped throughout the network
peerListGossiper *timer.Repeater
// If any chain is blocked on connecting to peers, track these blockers here
awaitingLock sync.Mutex
awaiting []*networking.AwaitingConnections
}
// Initialize to the c networking library. This should only be done once during
// node setup.
func (nm *Handshake) Initialize(
log logging.Logger,
vdrs validators.Set,
myAddr salticidae.NetAddr,
myID ids.ShortID,
peerNet salticidae.PeerNetwork,
registerer prometheus.Registerer,
enableStaking bool,
networkID uint32,
) {
log.AssertTrue(nm.net == nil, "Should only register network handlers once")
nm.handshakeMetrics.Initialize(log, registerer)
nm.networkID = networkID
nm.log = log
nm.vdrs = vdrs
nm.myAddr = myAddr
nm.myID = myID
nm.net = peerNet
nm.enableStaking = enableStaking
nm.requested = make(map[string]struct{})
nm.requestedTimeout.Initialize(ConnectTimeout)
go nm.log.RecoverAndPanic(nm.requestedTimeout.Dispatch)
nm.pending = NewConnections()
nm.versionTimeout.Initialize(GetVersionTimeout)
go nm.log.RecoverAndPanic(nm.versionTimeout.Dispatch)
nm.connections = NewConnections()
nm.reconnectTimeout.Initialize(ReconnectTimeout)
go nm.log.RecoverAndPanic(nm.reconnectTimeout.Dispatch)
nm.peerListGossiper = timer.NewRepeater(nm.gossipPeerList, PeerListGossipSpacing)
go nm.log.RecoverAndPanic(nm.peerListGossiper.Dispatch)
// register c message callbacks
net := peerNet.AsMsgNetwork()
net.RegConnHandler(salticidae.MsgNetworkConnCallback(C.connHandler), nil)
peerNet.RegPeerHandler(salticidae.PeerNetworkPeerCallback(C.peerHandler), nil)
peerNet.RegUnknownPeerHandler(salticidae.PeerNetworkUnknownPeerCallback(C.unknownPeerHandler), nil)
net.RegHandler(Ping, salticidae.MsgNetworkMsgCallback(C.ping), nil)
net.RegHandler(Pong, salticidae.MsgNetworkMsgCallback(C.pong), nil)
net.RegHandler(GetVersion, salticidae.MsgNetworkMsgCallback(C.getVersion), nil)
net.RegHandler(Version, salticidae.MsgNetworkMsgCallback(C.version), nil)
net.RegHandler(GetPeerList, salticidae.MsgNetworkMsgCallback(C.getPeerList), nil)
net.RegHandler(PeerList, salticidae.MsgNetworkMsgCallback(C.peerList), nil)
}
// ConnectTo add the peer as a connection and connects to them.
//
// assumes the peerID and addr are autofreed
func (nm *Handshake) ConnectTo(peer salticidae.PeerID, stakerID ids.ShortID, addr salticidae.NetAddr) {
if nm.pending.ContainsPeerID(peer) || nm.connections.ContainsPeerID(peer) {
return
}
nm.log.Debug("attempting to connect to %s", stakerID)
nm.net.AddPeer(peer)
nm.net.SetPeerAddr(peer, addr)
nm.net.ConnPeer(peer, 600, 1)
ip := toIPDesc(addr)
nm.pending.Add(peer, stakerID, ip)
peerBytes := toID(peer)
peerID := ids.NewID(peerBytes)
nm.reconnectTimeout.Put(peerID, func() {
nm.pending.Remove(peer, stakerID)
nm.connections.Remove(peer, stakerID)
nm.net.DelPeer(peer)
nm.numPeers.Set(float64(nm.connections.Len()))
})
}
// Connect attempts to start a connection with this provided address
//
// assumes addr is autofreed.
func (nm *Handshake) Connect(addr salticidae.NetAddr) {
ip := toIPDesc(addr)
ipStr := ip.String()
if nm.pending.ContainsIP(ip) || nm.connections.ContainsIP(ip) {
return
}
if !nm.enableStaking {
nm.log.Debug("adding peer %s", ip)
peer := salticidae.NewPeerIDFromNetAddr(addr, true)
nm.ConnectTo(peer, toShortID(ip), addr)
return
}
nm.requestedLock.Lock()
_, exists := nm.requested[ipStr]
nm.requestedLock.Unlock()
if exists {
return
}
nm.log.Debug("adding peer %s", ip)
count := new(int)
*count = 100
handler := new(func())
*handler = func() {
nm.requestedLock.Lock()
defer nm.requestedLock.Unlock()
if *count == 100 {
nm.requested[ipStr] = struct{}{}
}
if _, exists := nm.requested[ipStr]; !exists {
return
}
if *count <= 0 {
delete(nm.requested, ipStr)
return
}
*count--
if nm.pending.ContainsIP(ip) || nm.connections.ContainsIP(ip) {
return
}
nm.log.Debug("attempting to discover peer at %s", ipStr)
msgNet := nm.net.AsMsgNetwork()
msgNet.Connect(addr)
ipID := ids.NewID(hashing.ComputeHash256Array([]byte(ipStr)))
nm.requestedTimeout.Put(ipID, *handler)
}
(*handler)()
}
// AwaitConnections ...
func (nm *Handshake) AwaitConnections(awaiting *networking.AwaitingConnections) {
nm.awaitingLock.Lock()
defer nm.awaitingLock.Unlock()
awaiting.Add(nm.myID)
for _, cert := range nm.connections.IDs().List() {
awaiting.Add(cert)
}
if awaiting.Ready() {
go awaiting.Finish()
} else {
nm.awaiting = append(nm.awaiting, awaiting)
}
}
func (nm *Handshake) gossipPeerList() {
stakers := []ids.ShortID{}
nonStakers := []ids.ShortID{}
for _, id := range nm.connections.IDs().List() {
if nm.vdrs.Contains(id) {
stakers = append(stakers, id)
} else {
nonStakers = append(nonStakers, id)
}
}
numStakersToSend := (PeerListGossipSize + PeerListStakerGossipFraction - 1) / PeerListStakerGossipFraction
if len(stakers) < numStakersToSend {
numStakersToSend = len(stakers)
}
numNonStakersToSend := PeerListGossipSize - numStakersToSend
if len(nonStakers) < numNonStakersToSend {
numNonStakersToSend = len(nonStakers)
}
idsToSend := []ids.ShortID{}
sampler := random.Uniform{N: len(stakers)}
for i := 0; i < numStakersToSend; i++ {
idsToSend = append(idsToSend, stakers[sampler.Sample()])
}
sampler.N = len(nonStakers)
sampler.Replace()
for i := 0; i < numNonStakersToSend; i++ {
idsToSend = append(idsToSend, nonStakers[sampler.Sample()])
}
peers := []salticidae.PeerID{}
for _, id := range idsToSend {
if peer, exists := nm.connections.GetPeerID(id); exists {
peers = append(peers, peer)
}
}
nm.SendPeerList(peers...)
}
// Connections returns the object that tracks the nodes that are currently
// connected to this node.
func (nm *Handshake) Connections() Connections { return nm.connections }
// Shutdown the network
func (nm *Handshake) Shutdown() {
nm.versionTimeout.Stop()
nm.peerListGossiper.Stop()
}
// SendGetVersion to the requested peer
func (nm *Handshake) SendGetVersion(peer salticidae.PeerID) {
build := Builder{}
gv, err := build.GetVersion()
nm.log.AssertNoError(err)
nm.send(gv, peer)
nm.numGetVersionSent.Inc()
}
// SendVersion to the requested peer
func (nm *Handshake) SendVersion(peer salticidae.PeerID) error {
build := Builder{}
v, err := build.Version(nm.networkID, nm.clock.Unix(), toIPDesc(nm.myAddr), ClientVersion)
if err != nil {
return fmt.Errorf("packing version failed due to: %w", err)
}
nm.send(v, peer)
nm.numVersionSent.Inc()
return nil
}
// SendPeerList to the requested peer
func (nm *Handshake) SendPeerList(peers ...salticidae.PeerID) error {
if len(peers) == 0 {
return nil
}
_, ids, ips := nm.connections.Conns()
ipsToSend := []utils.IPDesc(nil)
for i, id := range ids {
ip := ips[i]
if !ip.IsZero() && nm.vdrs.Contains(id) {
ipsToSend = append(ipsToSend, ip)
}
}
if len(ipsToSend) == 0 {
nm.log.Debug("no IPs to send to %d peer(s)", len(peers))
return nil
}
nm.log.Verbo("sending %d ips to %d peer(s)", len(ipsToSend), len(peers))
build := Builder{}
pl, err := build.PeerList(ipsToSend)
if err != nil {
return fmt.Errorf("packing peer list failed due to: %w", err)
}
nm.send(pl, peers...)
nm.numPeerlistSent.Add(float64(len(peers)))
return nil
}
func (nm *Handshake) send(msg Msg, peers ...salticidae.PeerID) {
ds := msg.DataStream()
defer ds.Free()
ba := salticidae.NewByteArrayMovedFromDataStream(ds, false)
defer ba.Free()
cMsg := salticidae.NewMsgMovedFromByteArray(msg.Op(), ba, false)
defer cMsg.Free()
switch len(peers) {
case 0:
case 1:
nm.net.SendMsg(cMsg, peers[0])
default:
nm.net.MulticastMsgByMove(cMsg, peers)
}
}
// connHandler notifies of a new inbound connection
//export connHandler
func connHandler(_conn *C.struct_msgnetwork_conn_t, connected C.bool, _ unsafe.Pointer) C.bool {
if !HandshakeNet.enableStaking || !bool(connected) {
return connected
}
HandshakeNet.requestedLock.Lock()
defer HandshakeNet.requestedLock.Unlock()
conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn))
addr := conn.GetAddr().Copy(true)
ip := toIPDesc(addr)
ipStr := ip.String()
ipID := ids.NewID(hashing.ComputeHash256Array([]byte(ipStr)))
HandshakeNet.requestedTimeout.Remove(ipID)
if _, exists := HandshakeNet.requested[ipStr]; !exists {
HandshakeNet.log.Debug("connHandler called with ip %s", ip)
return true
}
delete(HandshakeNet.requested, ipStr)
cert := conn.GetPeerCert()
peer := salticidae.NewPeerIDFromX509(cert, true)
HandshakeNet.ConnectTo(peer, getCert(cert), addr)
return true
}
// assumes peer is autofreed
func (nm *Handshake) connectedToPeer(conn *C.struct_peernetwork_conn_t, peer salticidae.PeerID) {
peerBytes := toID(peer)
peerID := ids.NewID(peerBytes)
// If we're enforcing staking, use a peer's certificate to uniquely identify them
// Otherwise, use a hash of their ip to identify them
cert := ids.ShortID{}
if nm.enableStaking {
cert = getPeerCert(conn)
} else {
key := [20]byte{}
copy(key[:], peerID.Bytes())
cert = ids.NewShortID(key)
}
nm.log.Debug("connected to %s", cert)
nm.reconnectTimeout.Remove(peerID)
handler := new(func())
*handler = func() {
if nm.pending.ContainsPeerID(peer) {
nm.SendGetVersion(peer)
nm.versionTimeout.Put(peerID, *handler)
}
}
(*handler)()
}
// assumes peer is autofreed
func (nm *Handshake) disconnectedFromPeer(peer salticidae.PeerID) {
cert := ids.ShortID{}
if pendingCert, exists := nm.pending.GetID(peer); exists {
cert = pendingCert
nm.log.Debug("disconnected from pending peer %s", cert)
} else if connectedCert, exists := nm.connections.GetID(peer); exists {
cert = connectedCert
nm.log.Debug("disconnected from peer %s", cert)
} else {
return
}
peerBytes := toID(peer)
peerID := ids.NewID(peerBytes)
nm.versionTimeout.Remove(peerID)
nm.connections.Remove(peer, cert)
nm.numPeers.Set(float64(nm.connections.Len()))
if nm.vdrs.Contains(cert) {
nm.reconnectTimeout.Put(peerID, func() {
nm.pending.Remove(peer, cert)
nm.connections.Remove(peer, cert)
nm.net.DelPeer(peer)
nm.numPeers.Set(float64(nm.connections.Len()))
})
nm.pending.Add(peer, cert, utils.IPDesc{})
} else {
nm.pending.Remove(peer, cert)
nm.net.DelPeer(peer)
}
if !nm.enableStaking {
nm.vdrs.Remove(cert)
}
nm.awaitingLock.Lock()
defer nm.awaitingLock.Unlock()
for _, awaiting := range HandshakeNet.awaiting {
awaiting.Remove(cert)
}
}
// checkCompatibility Check to make sure that the peer and I speak the same language.
func (nm *Handshake) checkCompatibility(peerVersion string) bool {
if !strings.HasPrefix(peerVersion, VersionPrefix) {
nm.log.Debug("peer attempted to connect with an invalid version prefix")
return false
}
peerVersion = peerVersion[len(VersionPrefix):]
splitPeerVersion := strings.SplitN(peerVersion, VersionSeparator, 3)
if len(splitPeerVersion) != 3 {
nm.log.Debug("peer attempted to connect with an invalid number of subversions")
return false
}
major, err := strconv.Atoi(splitPeerVersion[0])
if err != nil {
nm.log.Debug("peer attempted to connect with an invalid major version")
return false
}
minor, err := strconv.Atoi(splitPeerVersion[1])
if err != nil {
nm.log.Debug("peer attempted to connect with an invalid minor version")
return false
}
patch, err := strconv.Atoi(splitPeerVersion[2])
if err != nil {
nm.log.Debug("peer attempted to connect with an invalid patch version")
return false
}
switch {
case major < MajorVersion:
// peers major version is too low
return false
case major > MajorVersion:
nm.log.Warn("peer attempted to connect with a higher major version, this client may need to be updated")
return false
}
switch {
case minor < MinorVersion:
// peers minor version is too low
return false
case minor > MinorVersion:
nm.log.Warn("peer attempted to connect with a higher minor version, this client may need to be updated")
return false
}
if patch > PatchVersion {
nm.log.Warn("peer is connecting with a higher patch version, this client may need to be updated")
}
return true
}
// peerHandler notifies a change to the set of connected peers
// connected is true if a new peer is connected
// connected is false if a formerly connected peer has disconnected
//export peerHandler
func peerHandler(_conn *C.struct_peernetwork_conn_t, connected C.bool, _ unsafe.Pointer) {
HandshakeNet.log.Debug("peerHandler called")
pConn := salticidae.PeerNetworkConnFromC(salticidae.CPeerNetworkConn(_conn))
peer := pConn.GetPeerID(true)
if connected {
HandshakeNet.connectedToPeer(_conn, peer)
} else {
HandshakeNet.disconnectedFromPeer(peer)
}
}
// unknownPeerHandler notifies of an unknown peer connection attempt
//export unknownPeerHandler
func unknownPeerHandler(_addr *C.netaddr_t, _cert *C.x509_t, _ unsafe.Pointer) {
HandshakeNet.log.Debug("unknownPeerHandler called")
addr := salticidae.NetAddrFromC(salticidae.CNetAddr(_addr)).Copy(true)
ip := toIPDesc(addr)
HandshakeNet.log.Debug("adding peer at %s", ip)
var peer salticidae.PeerID
var id ids.ShortID
if HandshakeNet.enableStaking {
cert := salticidae.X509FromC(salticidae.CX509(_cert))
peer = salticidae.NewPeerIDFromX509(cert, true)
id = getCert(cert)
} else {
peer = salticidae.NewPeerIDFromNetAddr(addr, true)
id = toShortID(ip)
}
peerBytes := toID(peer)
peerID := ids.NewID(peerBytes)
HandshakeNet.reconnectTimeout.Put(peerID, func() {
HandshakeNet.pending.Remove(peer, id)
HandshakeNet.connections.Remove(peer, id)
HandshakeNet.net.DelPeer(peer)
HandshakeNet.numPeers.Set(float64(HandshakeNet.connections.Len()))
})
HandshakeNet.pending.Add(peer, id, utils.IPDesc{})
HandshakeNet.net.AddPeer(peer)
}
// ping handles the recept of a ping message
//export ping
func ping(_ *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
conn := salticidae.PeerNetworkConnFromC(salticidae.CPeerNetworkConn(_conn))
peer := conn.GetPeerID(false)
defer peer.Free()
build := Builder{}
pong, err := build.Pong()
HandshakeNet.log.AssertNoError(err)
HandshakeNet.send(pong, peer)
}
// pong handles the recept of a pong message
//export pong
func pong(*C.struct_msg_t, *C.struct_msgnetwork_conn_t, unsafe.Pointer) {}
// getVersion handles the recept of a getVersion message
//export getVersion
func getVersion(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
HandshakeNet.numGetVersionReceived.Inc()
conn := salticidae.PeerNetworkConnFromC(salticidae.CPeerNetworkConn(_conn))
peer := conn.GetPeerID(false)
defer peer.Free()
HandshakeNet.SendVersion(peer)
}
// version handles the recept of a version message
//export version
func version(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
HandshakeNet.numVersionReceived.Inc()
msg := salticidae.MsgFromC(salticidae.CMsg(_msg))
conn := salticidae.PeerNetworkConnFromC(salticidae.CPeerNetworkConn(_conn))
peer := conn.GetPeerID(true)
peerBytes := toID(peer)
peerID := ids.NewID(peerBytes)
HandshakeNet.versionTimeout.Remove(peerID)
id, exists := HandshakeNet.pending.GetID(peer)
if !exists {
HandshakeNet.log.Debug("dropping Version message because the peer isn't pending")
return
}
HandshakeNet.pending.Remove(peer, id)
build := Builder{}
pMsg, err := build.Parse(Version, msg.GetPayloadByMove())
if err != nil {
HandshakeNet.log.Debug("failed to parse Version message")
HandshakeNet.net.DelPeer(peer)
return
}
if networkID := pMsg.Get(NetworkID).(uint32); networkID != HandshakeNet.networkID {
HandshakeNet.log.Debug("peer's network ID doesn't match our networkID: Peer's = %d ; Ours = %d", networkID, HandshakeNet.networkID)
HandshakeNet.net.DelPeer(peer)
return
}
myTime := float64(HandshakeNet.clock.Unix())
if peerTime := float64(pMsg.Get(MyTime).(uint64)); math.Abs(peerTime-myTime) > MaxClockDifference.Seconds() {
HandshakeNet.log.Debug("peer's clock is too far out of sync with mine. Peer's = %d, Ours = %d (seconds)", uint64(peerTime), uint64(myTime))
HandshakeNet.net.DelPeer(peer)
return
}
if peerVersion := pMsg.Get(VersionStr).(string); !HandshakeNet.checkCompatibility(peerVersion) {
HandshakeNet.log.Debug("peer version, %s, is not compatible. dropping connection.", peerVersion)
HandshakeNet.net.DelPeer(peer)
return
}
ip := pMsg.Get(IP).(utils.IPDesc)
HandshakeNet.log.Debug("Finishing handshake with %s", ip)
HandshakeNet.SendPeerList(peer)
HandshakeNet.connections.Add(peer, id, ip)
HandshakeNet.numPeers.Set(float64(HandshakeNet.connections.Len()))
if !HandshakeNet.enableStaking {
HandshakeNet.vdrs.Add(validators.NewValidator(id, 1))
}
HandshakeNet.awaitingLock.Lock()
defer HandshakeNet.awaitingLock.Unlock()
for i := 0; i < len(HandshakeNet.awaiting); i++ {
awaiting := HandshakeNet.awaiting[i]
awaiting.Add(id)
if !awaiting.Ready() {
continue
}
newLen := len(HandshakeNet.awaiting) - 1
HandshakeNet.awaiting[i] = HandshakeNet.awaiting[newLen]
HandshakeNet.awaiting = HandshakeNet.awaiting[:newLen]
i--
go awaiting.Finish()
}
}
// getPeerList handles the recept of a getPeerList message
//export getPeerList
func getPeerList(_ *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
HandshakeNet.numGetPeerlistReceived.Inc()
conn := salticidae.PeerNetworkConnFromC(salticidae.CPeerNetworkConn(_conn))
peer := conn.GetPeerID(false)
defer peer.Free()
HandshakeNet.SendPeerList(peer)
}
// peerList handles the recept of a peerList message
//export peerList
func peerList(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
HandshakeNet.numPeerlistReceived.Inc()
msg := salticidae.MsgFromC(salticidae.CMsg(_msg))
build := Builder{}
pMsg, err := build.Parse(PeerList, msg.GetPayloadByMove())
if err != nil {
HandshakeNet.log.Debug("failed to parse PeerList message due to %s", err)
// TODO: What should we do here?
return
}
ips := pMsg.Get(Peers).([]utils.IPDesc)
cErr := salticidae.NewError()
for _, ip := range ips {
addr := salticidae.NewNetAddrFromIPPortString(ip.String(), true, &cErr)
if cErr.GetCode() != 0 || HandshakeNet.myAddr.IsEq(addr) {
// Make sure not to connect to myself
continue
}
HandshakeNet.Connect(addr)
}
}
func getPeerCert(_conn *C.struct_peernetwork_conn_t) ids.ShortID {
conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn))
return getCert(conn.GetPeerCert())
}
func getCert(cert salticidae.X509) ids.ShortID {
der := cert.GetDer(false)
certDS := salticidae.NewDataStreamMovedFromByteArray(der, false)
certBytes := certDS.GetDataInPlace(certDS.Size()).Get()
certID, err := ids.ToShortID(hashing.PubkeyBytesToAddress(certBytes))
certDS.Free()
der.Free()
HandshakeNet.log.AssertNoError(err)
return certID
}
func toShortID(ip utils.IPDesc) ids.ShortID {
return ids.NewShortID(hashing.ComputeHash160Array([]byte(ip.String())))
}

View File

@ -1,104 +0,0 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package networking
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/ava-labs/gecko/utils/logging"
)
type handshakeMetrics struct {
numPeers prometheus.Gauge
numGetVersionSent, numGetVersionReceived,
numVersionSent, numVersionReceived,
numGetPeerlistSent, numGetPeerlistReceived,
numPeerlistSent, numPeerlistReceived prometheus.Counter
}
func (hm *handshakeMetrics) Initialize(log logging.Logger, registerer prometheus.Registerer) {
hm.numPeers = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "gecko",
Name: "peers",
Help: "Number of network peers",
})
hm.numGetVersionSent = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: "get_version_sent",
Help: "Number of get_version messages sent",
})
hm.numGetVersionReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: "get_version_received",
Help: "Number of get_version messages received",
})
hm.numVersionSent = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: "version_sent",
Help: "Number of version messages sent",
})
hm.numVersionReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: "version_received",
Help: "Number of version messages received",
})
hm.numGetPeerlistSent = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: "get_peerlist_sent",
Help: "Number of get_peerlist messages sent",
})
hm.numGetPeerlistReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: "get_peerlist_received",
Help: "Number of get_peerlist messages received",
})
hm.numPeerlistSent = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: "peerlist_sent",
Help: "Number of peerlist messages sent",
})
hm.numPeerlistReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: "peerlist_received",
Help: "Number of peerlist messages received",
})
if err := registerer.Register(hm.numPeers); err != nil {
log.Error("Failed to register peers statistics due to %s", err)
}
if err := registerer.Register(hm.numGetVersionSent); err != nil {
log.Error("Failed to register get_version_sent statistics due to %s", err)
}
if err := registerer.Register(hm.numGetVersionReceived); err != nil {
log.Error("Failed to register get_version_received statistics due to %s", err)
}
if err := registerer.Register(hm.numVersionSent); err != nil {
log.Error("Failed to register version_sent statistics due to %s", err)
}
if err := registerer.Register(hm.numVersionReceived); err != nil {
log.Error("Failed to register version_received statistics due to %s", err)
}
if err := registerer.Register(hm.numGetPeerlistSent); err != nil {
log.Error("Failed to register get_peerlist_sent statistics due to %s", err)
}
if err := registerer.Register(hm.numGetPeerlistReceived); err != nil {
log.Error("Failed to register get_peerlist_received statistics due to %s", err)
}
if err := registerer.Register(hm.numPeerlistSent); err != nil {
log.Error("Failed to register peerlist_sent statistics due to %s", err)
}
if err := registerer.Register(hm.numPeerlistReceived); err != nil {
log.Error("Failed to register peerlist_received statistics due to %s", err)
}
}

View File

@ -1,658 +0,0 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package networking
// #include "salticidae/network.h"
// void getAcceptedFrontier(msg_t *, msgnetwork_conn_t *, void *);
// void acceptedFrontier(msg_t *, msgnetwork_conn_t *, void *);
// void getAccepted(msg_t *, msgnetwork_conn_t *, void *);
// void accepted(msg_t *, msgnetwork_conn_t *, void *);
// void get(msg_t *, msgnetwork_conn_t *, void *);
// void put(msg_t *, msgnetwork_conn_t *, void *);
// void pushQuery(msg_t *, msgnetwork_conn_t *, void *);
// void pullQuery(msg_t *, msgnetwork_conn_t *, void *);
// void chits(msg_t *, msgnetwork_conn_t *, void *);
import "C"
import (
"errors"
"fmt"
"math"
"unsafe"
"github.com/prometheus/client_golang/prometheus"
"github.com/ava-labs/salticidae-go"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow/networking/router"
"github.com/ava-labs/gecko/snow/validators"
"github.com/ava-labs/gecko/utils/formatting"
"github.com/ava-labs/gecko/utils/logging"
"github.com/ava-labs/gecko/utils/random"
"github.com/ava-labs/gecko/utils/timer"
)
// GossipSize is the maximum number of peers to gossip a container to
const (
GossipSize = 50
)
var (
// VotingNet implements the SenderExternal interface.
VotingNet = Voting{}
)
var (
errConnectionDropped = errors.New("connection dropped before receiving message")
)
// Voting implements the SenderExternal interface with a c++ library.
type Voting struct {
votingMetrics
log logging.Logger
vdrs validators.Set
net salticidae.PeerNetwork
conns Connections
router router.Router
executor timer.Executor
}
// Initialize to the c networking library. Should only be called once ever.
func (s *Voting) Initialize(log logging.Logger, vdrs validators.Set, peerNet salticidae.PeerNetwork, conns Connections, router router.Router, registerer prometheus.Registerer) {
log.AssertTrue(s.net == nil, "Should only register network handlers once")
log.AssertTrue(s.conns == nil, "Should only set connections once")
log.AssertTrue(s.router == nil, "Should only set the router once")
s.log = log
s.vdrs = vdrs
s.net = peerNet
s.conns = conns
s.router = router
s.votingMetrics.Initialize(log, registerer)
net := peerNet.AsMsgNetwork()
net.RegHandler(GetAcceptedFrontier, salticidae.MsgNetworkMsgCallback(C.getAcceptedFrontier), nil)
net.RegHandler(AcceptedFrontier, salticidae.MsgNetworkMsgCallback(C.acceptedFrontier), nil)
net.RegHandler(GetAccepted, salticidae.MsgNetworkMsgCallback(C.getAccepted), nil)
net.RegHandler(Accepted, salticidae.MsgNetworkMsgCallback(C.accepted), nil)
net.RegHandler(Get, salticidae.MsgNetworkMsgCallback(C.get), nil)
net.RegHandler(Put, salticidae.MsgNetworkMsgCallback(C.put), nil)
net.RegHandler(PushQuery, salticidae.MsgNetworkMsgCallback(C.pushQuery), nil)
net.RegHandler(PullQuery, salticidae.MsgNetworkMsgCallback(C.pullQuery), nil)
net.RegHandler(Chits, salticidae.MsgNetworkMsgCallback(C.chits), nil)
s.executor.Initialize()
go log.RecoverAndPanic(s.executor.Dispatch)
}
// Shutdown threads
func (s *Voting) Shutdown() { s.executor.Stop() }
// Accept is called after every consensus decision
func (s *Voting) Accept(chainID, containerID ids.ID, container []byte) error {
return s.gossip(chainID, containerID, container)
}
// GetAcceptedFrontier implements the Sender interface.
func (s *Voting) GetAcceptedFrontier(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32) {
peers := []salticidae.PeerID(nil)
validatorIDList := validatorIDs.List()
for _, validatorID := range validatorIDList {
vID := validatorID
if peer, exists := s.conns.GetPeerID(vID); exists {
peers = append(peers, peer)
s.log.Verbo("Sending a GetAcceptedFrontier to %s", vID)
} else {
s.log.Debug("attempted to send a GetAcceptedFrontier message to a disconnected validator: %s", vID)
s.executor.Add(func() { s.router.GetAcceptedFrontierFailed(vID, chainID, requestID) })
}
}
build := Builder{}
msg, err := build.GetAcceptedFrontier(chainID, requestID)
s.log.AssertNoError(err)
s.log.Verbo("Sending a GetAcceptedFrontier message."+
"\nNumber of Validators: %d"+
"\nChain: %s"+
"\nRequest ID: %d",
len(peers),
chainID,
requestID,
)
s.send(msg, peers...)
s.numGetAcceptedFrontierSent.Add(float64(len(peers)))
}
// AcceptedFrontier implements the Sender interface.
func (s *Voting) AcceptedFrontier(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
peer, exists := s.conns.GetPeerID(validatorID)
if !exists {
s.log.Debug("attempted to send an AcceptedFrontier message to disconnected validator: %s", validatorID)
return // Validator is not connected
}
build := Builder{}
msg, err := build.AcceptedFrontier(chainID, requestID, containerIDs)
if err != nil {
s.log.Error("attempted to pack too large of an AcceptedFrontier message.\nNumber of containerIDs: %d", containerIDs.Len())
return // Packing message failed
}
s.log.Verbo("Sending an AcceptedFrontier message."+
"\nValidator: %s"+
"\nChain: %s"+
"\nRequest ID: %d"+
"\nContainer IDs: %s",
validatorID,
chainID,
requestID,
containerIDs,
)
s.send(msg, peer)
s.numAcceptedFrontierSent.Inc()
}
// GetAccepted implements the Sender interface.
func (s *Voting) GetAccepted(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
peers := []salticidae.PeerID(nil)
validatorIDList := validatorIDs.List()
for _, validatorID := range validatorIDList {
vID := validatorID
if peer, exists := s.conns.GetPeerID(validatorID); exists {
peers = append(peers, peer)
s.log.Verbo("sending a GetAccepted to %s", vID)
} else {
s.log.Debug("attempted to send a GetAccepted message to a disconnected validator: %s", vID)
s.executor.Add(func() { s.router.GetAcceptedFailed(vID, chainID, requestID) })
}
}
build := Builder{}
msg, err := build.GetAccepted(chainID, requestID, containerIDs)
if err != nil {
for _, peer := range peers {
if validatorID, exists := s.conns.GetID(peer); exists {
s.executor.Add(func() { s.router.GetAcceptedFailed(validatorID, chainID, requestID) })
}
}
s.log.Debug("attempted to pack too large of a GetAccepted message.\nNumber of containerIDs: %d", containerIDs.Len())
return // Packing message failed
}
s.log.Verbo("Sending a GetAccepted message."+
"\nNumber of Validators: %d"+
"\nChain: %s"+
"\nRequest ID: %d"+
"\nContainer IDs:%s",
len(peers),
chainID,
requestID,
containerIDs,
)
s.send(msg, peers...)
s.numGetAcceptedSent.Add(float64(len(peers)))
}
// Accepted implements the Sender interface.
func (s *Voting) Accepted(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
peer, exists := s.conns.GetPeerID(validatorID)
if !exists {
s.log.Debug("attempted to send an Accepted message to a disconnected validator: %s", validatorID)
return // Validator is not connected
}
build := Builder{}
msg, err := build.Accepted(chainID, requestID, containerIDs)
if err != nil {
s.log.Error("attempted to pack too large of an Accepted message.\nNumber of containerIDs: %d", containerIDs.Len())
return // Packing message failed
}
s.log.Verbo("Sending an Accepted message."+
"\nValidator: %s"+
"\nChain: %s"+
"\nRequest ID: %d"+
"\nContainer IDs: %s",
validatorID,
chainID,
requestID,
containerIDs,
)
s.send(msg, peer)
s.numAcceptedSent.Inc()
}
// Get implements the Sender interface.
func (s *Voting) Get(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID) {
peer, exists := s.conns.GetPeerID(validatorID)
if !exists {
s.log.Debug("attempted to send a Get message to a disconnected validator: %s", validatorID)
s.executor.Add(func() { s.router.GetFailed(validatorID, chainID, requestID) })
return // Validator is not connected
}
build := Builder{}
msg, err := build.Get(chainID, requestID, containerID)
s.log.AssertNoError(err)
s.log.Verbo("Sending a Get message."+
"\nValidator: %s"+
"\nChain: %s"+
"\nRequest ID: %d"+
"\nContainer ID: %s",
validatorID,
chainID,
requestID,
containerID,
)
s.send(msg, peer)
s.numGetSent.Inc()
}
// Put implements the Sender interface.
func (s *Voting) Put(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) {
peer, exists := s.conns.GetPeerID(validatorID)
if !exists {
s.log.Debug("attempted to send a Container message to a disconnected validator: %s", validatorID)
return // Validator is not connected
}
build := Builder{}
msg, err := build.Put(chainID, requestID, containerID, container)
if err != nil {
s.log.Error("attempted to pack too large of a Put message.\nContainer length: %d", len(container))
return // Packing message failed
}
s.log.Verbo("Sending a Container message."+
"\nValidator: %s"+
"\nChain: %s"+
"\nRequest ID: %d"+
"\nContainer ID: %s"+
"\nContainer:\n%s",
validatorID,
chainID,
requestID,
containerID,
formatting.DumpBytes{Bytes: container},
)
s.send(msg, peer)
s.numPutSent.Inc()
}
// PushQuery implements the Sender interface.
func (s *Voting) PushQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) {
peers := []salticidae.PeerID(nil)
validatorIDList := validatorIDs.List()
for _, validatorID := range validatorIDList {
vID := validatorID
if peer, exists := s.conns.GetPeerID(vID); exists {
peers = append(peers, peer)
s.log.Verbo("Sending a PushQuery to %s", vID)
} else {
s.log.Debug("attempted to send a PushQuery message to a disconnected validator: %s", vID)
s.executor.Add(func() { s.router.QueryFailed(vID, chainID, requestID) })
}
}
build := Builder{}
msg, err := build.PushQuery(chainID, requestID, containerID, container)
if err != nil {
for _, peer := range peers {
if validatorID, exists := s.conns.GetID(peer); exists {
s.executor.Add(func() { s.router.QueryFailed(validatorID, chainID, requestID) })
}
}
s.log.Error("attempted to pack too large of a PushQuery message.\nContainer length: %d", len(container))
return // Packing message failed
}
s.log.Verbo("Sending a PushQuery message."+
"\nNumber of Validators: %d"+
"\nChain: %s"+
"\nRequest ID: %d"+
"\nContainer ID: %s"+
"\nContainer:\n%s",
len(peers),
chainID,
requestID,
containerID,
formatting.DumpBytes{Bytes: container},
)
s.send(msg, peers...)
s.numPushQuerySent.Add(float64(len(peers)))
}
// PullQuery implements the Sender interface.
func (s *Voting) PullQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID) {
peers := []salticidae.PeerID(nil)
validatorIDList := validatorIDs.List()
for _, validatorID := range validatorIDList {
vID := validatorID
if peer, exists := s.conns.GetPeerID(vID); exists {
peers = append(peers, peer)
s.log.Verbo("Sending a PullQuery to %s", vID)
} else {
s.log.Debug("attempted to send a PullQuery message to a disconnected validator: %s", vID)
s.executor.Add(func() { s.router.QueryFailed(vID, chainID, requestID) })
}
}
build := Builder{}
msg, err := build.PullQuery(chainID, requestID, containerID)
s.log.AssertNoError(err)
s.log.Verbo("Sending a PullQuery message."+
"\nNumber of Validators: %d"+
"\nChain: %s"+
"\nRequest ID: %d"+
"\nContainer ID: %s",
len(peers),
chainID,
requestID,
containerID,
)
s.send(msg, peers...)
s.numPullQuerySent.Add(float64(len(peers)))
}
// Chits implements the Sender interface.
func (s *Voting) Chits(validatorID ids.ShortID, chainID ids.ID, requestID uint32, votes ids.Set) {
peer, exists := s.conns.GetPeerID(validatorID)
if !exists {
s.log.Debug("attempted to send a Chits message to a disconnected validator: %s", validatorID)
return // Validator is not connected
}
build := Builder{}
msg, err := build.Chits(chainID, requestID, votes)
if err != nil {
s.log.Error("attempted to pack too large of a Chits message.\nChits length: %d", votes.Len())
return // Packing message failed
}
s.log.Verbo("Sending a Chits message."+
"\nValidator: %s"+
"\nChain: %s"+
"\nRequest ID: %d"+
"\nNumber of Chits: %d",
validatorID,
chainID,
requestID,
votes.Len(),
)
s.send(msg, peer)
s.numChitsSent.Inc()
}
// Gossip attempts to gossip the container to the network
func (s *Voting) Gossip(chainID, containerID ids.ID, container []byte) {
if err := s.gossip(chainID, containerID, container); err != nil {
s.log.Error("error gossiping container %s to %s: %s", containerID, chainID, err)
}
}
func (s *Voting) send(msg Msg, peers ...salticidae.PeerID) {
ds := msg.DataStream()
defer ds.Free()
ba := salticidae.NewByteArrayMovedFromDataStream(ds, false)
defer ba.Free()
cMsg := salticidae.NewMsgMovedFromByteArray(msg.Op(), ba, false)
defer cMsg.Free()
switch len(peers) {
case 0:
case 1:
s.net.SendMsg(cMsg, peers[0])
default:
s.net.MulticastMsgByMove(cMsg, peers)
}
}
func (s *Voting) gossip(chainID, containerID ids.ID, container []byte) error {
allPeers := s.conns.PeerIDs()
numToGossip := GossipSize
if numToGossip > len(allPeers) {
numToGossip = len(allPeers)
}
peers := make([]salticidae.PeerID, numToGossip)
sampler := random.Uniform{N: len(allPeers)}
for i := range peers {
peers[i] = allPeers[sampler.Sample()]
}
build := Builder{}
msg, err := build.Put(chainID, math.MaxUint32, containerID, container)
if err != nil {
return fmt.Errorf("attempted to pack too large of a Put message.\nContainer length: %d", len(container))
}
s.log.Verbo("Sending a Put message to peers."+
"\nNumber of Peers: %d"+
"\nChain: %s"+
"\nContainer ID: %s"+
"\nContainer:\n%s",
len(peers),
chainID,
containerID,
formatting.DumpBytes{Bytes: container},
)
s.send(msg, peers...)
s.numPutSent.Add(float64(len(peers)))
return nil
}
// getAcceptedFrontier handles the recept of a getAcceptedFrontier container
// message for a chain
//export getAcceptedFrontier
func getAcceptedFrontier(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
VotingNet.numGetAcceptedFrontierReceived.Inc()
validatorID, chainID, requestID, _, err := VotingNet.sanitize(_msg, _conn, GetAcceptedFrontier)
if err != nil {
VotingNet.log.Debug("failed to sanitize getAcceptedFrontier message due to: %s", err)
return
}
VotingNet.router.GetAcceptedFrontier(validatorID, chainID, requestID)
}
// acceptedFrontier handles the recept of an acceptedFrontier message
//export acceptedFrontier
func acceptedFrontier(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
VotingNet.numAcceptedFrontierReceived.Inc()
validatorID, chainID, requestID, msg, err := VotingNet.sanitize(_msg, _conn, AcceptedFrontier)
if err != nil {
VotingNet.log.Debug("failed to sanitize acceptedFrontier message due to: %s", err)
return
}
containerIDs := ids.Set{}
for _, containerIDBytes := range msg.Get(ContainerIDs).([][]byte) {
containerID, err := ids.ToID(containerIDBytes)
if err != nil {
VotingNet.log.Debug("error parsing ContainerID %v: %s", containerIDBytes, err)
return
}
containerIDs.Add(containerID)
}
VotingNet.router.AcceptedFrontier(validatorID, chainID, requestID, containerIDs)
}
// getAccepted handles the recept of a getAccepted message
//export getAccepted
func getAccepted(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
VotingNet.numGetAcceptedReceived.Inc()
validatorID, chainID, requestID, msg, err := VotingNet.sanitize(_msg, _conn, GetAccepted)
if err != nil {
VotingNet.log.Debug("failed to sanitize getAccepted message due to: %s", err)
return
}
containerIDs := ids.Set{}
for _, containerIDBytes := range msg.Get(ContainerIDs).([][]byte) {
containerID, err := ids.ToID(containerIDBytes)
if err != nil {
VotingNet.log.Debug("error parsing ContainerID %v: %s", containerIDBytes, err)
return
}
containerIDs.Add(containerID)
}
VotingNet.router.GetAccepted(validatorID, chainID, requestID, containerIDs)
}
// accepted handles the recept of an accepted message
//export accepted
func accepted(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
VotingNet.numAcceptedReceived.Inc()
validatorID, chainID, requestID, msg, err := VotingNet.sanitize(_msg, _conn, Accepted)
if err != nil {
VotingNet.log.Debug("failed to sanitize accepted message due to: %s", err)
return
}
containerIDs := ids.Set{}
for _, containerIDBytes := range msg.Get(ContainerIDs).([][]byte) {
containerID, err := ids.ToID(containerIDBytes)
if err != nil {
VotingNet.log.Debug("error parsing ContainerID %v: %s", containerIDBytes, err)
return
}
containerIDs.Add(containerID)
}
VotingNet.router.Accepted(validatorID, chainID, requestID, containerIDs)
}
// get handles the recept of a get container message for a chain
//export get
func get(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
VotingNet.numGetReceived.Inc()
validatorID, chainID, requestID, msg, err := VotingNet.sanitize(_msg, _conn, Get)
if err != nil {
VotingNet.log.Debug("failed to sanitize get message due to: %s", err)
return
}
containerID, _ := ids.ToID(msg.Get(ContainerID).([]byte))
VotingNet.router.Get(validatorID, chainID, requestID, containerID)
}
// put handles the receipt of a container message
//export put
func put(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
VotingNet.numPutReceived.Inc()
validatorID, chainID, requestID, msg, err := VotingNet.sanitize(_msg, _conn, Put)
if err != nil {
VotingNet.log.Debug("failed to sanitize put message due to: %s", err)
return
}
containerID, _ := ids.ToID(msg.Get(ContainerID).([]byte))
containerBytes := msg.Get(ContainerBytes).([]byte)
VotingNet.router.Put(validatorID, chainID, requestID, containerID, containerBytes)
}
// pushQuery handles the recept of a pull query message
//export pushQuery
func pushQuery(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
VotingNet.numPushQueryReceived.Inc()
validatorID, chainID, requestID, msg, err := VotingNet.sanitize(_msg, _conn, PushQuery)
if err != nil {
VotingNet.log.Debug("failed to sanitize pushQuery message due to: %s", err)
return
}
containerID, _ := ids.ToID(msg.Get(ContainerID).([]byte))
containerBytes := msg.Get(ContainerBytes).([]byte)
VotingNet.router.PushQuery(validatorID, chainID, requestID, containerID, containerBytes)
}
// pullQuery handles the recept of a query message
//export pullQuery
func pullQuery(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
VotingNet.numPullQueryReceived.Inc()
validatorID, chainID, requestID, msg, err := VotingNet.sanitize(_msg, _conn, PullQuery)
if err != nil {
VotingNet.log.Debug("failed to sanitize pullQuery message due to: %s", err)
return
}
containerID, _ := ids.ToID(msg.Get(ContainerID).([]byte))
VotingNet.router.PullQuery(validatorID, chainID, requestID, containerID)
}
// chits handles the recept of a chits message
//export chits
func chits(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
VotingNet.numChitsReceived.Inc()
validatorID, chainID, requestID, msg, err := VotingNet.sanitize(_msg, _conn, Chits)
if err != nil {
VotingNet.log.Debug("failed to sanitize chits message due to: %s", err)
return
}
votes := ids.Set{}
for _, voteBytes := range msg.Get(ContainerIDs).([][]byte) {
vote, err := ids.ToID(voteBytes)
if err != nil {
VotingNet.log.Debug("error parsing chit %v: %s", voteBytes, err)
return
}
votes.Add(vote)
}
VotingNet.router.Chits(validatorID, chainID, requestID, votes)
}
func (s *Voting) sanitize(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, op salticidae.Opcode) (ids.ShortID, ids.ID, uint32, Msg, error) {
conn := salticidae.PeerNetworkConnFromC(salticidae.CPeerNetworkConn((*C.peernetwork_conn_t)(_conn)))
peer := conn.GetPeerID(false)
defer peer.Free()
validatorID, exists := s.conns.GetID(peer)
if !exists {
return ids.ShortID{}, ids.ID{}, 0, nil, fmt.Errorf("received message from un-registered peer %s", validatorID)
}
s.log.Verbo("received message from %s", validatorID)
msg := salticidae.MsgFromC(salticidae.CMsg(_msg))
codec := Codec{}
pMsg, err := codec.Parse(op, msg.GetPayloadByMove())
if err != nil {
return ids.ShortID{}, ids.ID{}, 0, nil, fmt.Errorf("couldn't parse payload: %w", err) // The message couldn't be parsed
}
chainID, err := ids.ToID(pMsg.Get(ChainID).([]byte))
s.log.AssertNoError(err)
requestID := pMsg.Get(RequestID).(uint32)
return validatorID, chainID, requestID, pMsg, nil
}

View File

@ -1,188 +0,0 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package networking
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/ava-labs/gecko/utils/logging"
)
type votingMetrics struct {
numGetAcceptedFrontierSent, numGetAcceptedFrontierReceived,
numAcceptedFrontierSent, numAcceptedFrontierReceived,
numGetAcceptedSent, numGetAcceptedReceived,
numAcceptedSent, numAcceptedReceived,
numGetSent, numGetReceived,
numPutSent, numPutReceived,
numPushQuerySent, numPushQueryReceived,
numPullQuerySent, numPullQueryReceived,
numChitsSent, numChitsReceived prometheus.Counter
}
func (vm *votingMetrics) Initialize(log logging.Logger, registerer prometheus.Registerer) {
vm.numGetAcceptedFrontierSent = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: "get_accepted_frontier_sent",
Help: "Number of get accepted frontier messages sent",
})
vm.numGetAcceptedFrontierReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: "get_accepted_frontier_received",
Help: "Number of get accepted frontier messages received",
})
vm.numAcceptedFrontierSent = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: "accepted_frontier_sent",
Help: "Number of accepted frontier messages sent",
})
vm.numAcceptedFrontierReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: "accepted_frontier_received",
Help: "Number of accepted frontier messages received",
})
vm.numGetAcceptedSent = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: "get_accepted_sent",
Help: "Number of get accepted messages sent",
})
vm.numGetAcceptedReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: "get_accepted_received",
Help: "Number of get accepted messages received",
})
vm.numAcceptedSent = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: "accepted_sent",
Help: "Number of accepted messages sent",
})
vm.numAcceptedReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: "accepted_received",
Help: "Number of accepted messages received",
})
vm.numGetSent = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: "get_sent",
Help: "Number of get messages sent",
})
vm.numGetReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: "get_received",
Help: "Number of get messages received",
})
vm.numPutSent = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: "put_sent",
Help: "Number of put messages sent",
})
vm.numPutReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: "put_received",
Help: "Number of put messages received",
})
vm.numPushQuerySent = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: "push_query_sent",
Help: "Number of push query messages sent",
})
vm.numPushQueryReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: "push_query_received",
Help: "Number of push query messages received",
})
vm.numPullQuerySent = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: "pull_query_sent",
Help: "Number of pull query messages sent",
})
vm.numPullQueryReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: "pull_query_received",
Help: "Number of pull query messages received",
})
vm.numChitsSent = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: "chits_sent",
Help: "Number of chits messages sent",
})
vm.numChitsReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: "chits_received",
Help: "Number of chits messages received",
})
if err := registerer.Register(vm.numGetAcceptedFrontierSent); err != nil {
log.Error("Failed to register get_accepted_frontier_sent statistics due to %s", err)
}
if err := registerer.Register(vm.numGetAcceptedFrontierReceived); err != nil {
log.Error("Failed to register get_accepted_frontier_received statistics due to %s", err)
}
if err := registerer.Register(vm.numAcceptedFrontierSent); err != nil {
log.Error("Failed to register accepted_frontier_sent statistics due to %s", err)
}
if err := registerer.Register(vm.numAcceptedFrontierReceived); err != nil {
log.Error("Failed to register accepted_frontier_received statistics due to %s", err)
}
if err := registerer.Register(vm.numGetAcceptedSent); err != nil {
log.Error("Failed to register get_accepted_sent statistics due to %s", err)
}
if err := registerer.Register(vm.numGetAcceptedReceived); err != nil {
log.Error("Failed to register get_accepted_received statistics due to %s", err)
}
if err := registerer.Register(vm.numAcceptedSent); err != nil {
log.Error("Failed to register accepted_sent statistics due to %s", err)
}
if err := registerer.Register(vm.numAcceptedReceived); err != nil {
log.Error("Failed to register accepted_received statistics due to %s", err)
}
if err := registerer.Register(vm.numGetSent); err != nil {
log.Error("Failed to register get_sent statistics due to %s", err)
}
if err := registerer.Register(vm.numGetReceived); err != nil {
log.Error("Failed to register get_received statistics due to %s", err)
}
if err := registerer.Register(vm.numPutSent); err != nil {
log.Error("Failed to register put_sent statistics due to %s", err)
}
if err := registerer.Register(vm.numPutReceived); err != nil {
log.Error("Failed to register put_received statistics due to %s", err)
}
if err := registerer.Register(vm.numPushQuerySent); err != nil {
log.Error("Failed to register push_query_sent statistics due to %s", err)
}
if err := registerer.Register(vm.numPushQueryReceived); err != nil {
log.Error("Failed to register push_query_received statistics due to %s", err)
}
if err := registerer.Register(vm.numPullQuerySent); err != nil {
log.Error("Failed to register pull_query_sent statistics due to %s", err)
}
if err := registerer.Register(vm.numPullQueryReceived); err != nil {
log.Error("Failed to register pull_query_received statistics due to %s", err)
}
if err := registerer.Register(vm.numChitsSent); err != nil {
log.Error("Failed to register chits_sent statistics due to %s", err)
}
if err := registerer.Register(vm.numChitsReceived); err != nil {
log.Error("Failed to register chits_received statistics due to %s", err)
}
}

View File

@ -1,75 +0,0 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package xputtest
// #include "salticidae/network.h"
// void issueTx(msg_t *, msgnetwork_conn_t *, void *);
import "C"
import (
"unsafe"
"github.com/ava-labs/salticidae-go"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/networking"
"github.com/ava-labs/gecko/snow/choices"
"github.com/ava-labs/gecko/utils/hashing"
)
// CClientHandler is the struct that will be accessed on event calls
var CClientHandler CClient
// CClient manages a client network using the c networking library
type CClient struct {
issuer *Issuer
net salticidae.MsgNetwork
}
// Initialize to the c networking library. This should only be called once
// during setup of the node.
func (h *CClient) Initialize(net salticidae.MsgNetwork, issuer *Issuer) {
h.issuer = issuer
h.net = net
net.RegHandler(networking.IssueTx, salticidae.MsgNetworkMsgCallback(C.issueTx), nil)
}
func (h *CClient) send(msg networking.Msg, conn salticidae.MsgNetworkConn) {
ds := msg.DataStream()
defer ds.Free()
ba := salticidae.NewByteArrayMovedFromDataStream(ds, false)
defer ba.Free()
cMsg := salticidae.NewMsgMovedFromByteArray(msg.Op(), ba, false)
defer cMsg.Free()
h.net.SendMsg(cMsg, conn)
}
// issueTx handles the recept of an IssueTx message
//export issueTx
func issueTx(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
msg := salticidae.MsgFromC(salticidae.CMsg(_msg))
build := networking.Builder{}
pMsg, err := build.Parse(networking.IssueTx, msg.GetPayloadByMove())
if err != nil {
return
}
chainID, _ := ids.ToID(pMsg.Get(networking.ChainID).([]byte))
txBytes := pMsg.Get(networking.Tx).([]byte)
txID := ids.NewID(hashing.ComputeHash256Array(txBytes))
conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn)).Copy(false)
CClientHandler.issuer.IssueTx(chainID, txBytes, func(status choices.Status) {
build := networking.Builder{}
msg, _ := build.DecidedTx(txID, status)
CClientHandler.send(msg, conn)
conn.Free()
})
}

View File

@ -1,77 +0,0 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package xputtest
import (
"sync"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow"
"github.com/ava-labs/gecko/snow/choices"
"github.com/ava-labs/gecko/utils/logging"
)
type issuableVM interface {
IssueTx([]byte, func(choices.Status)) (ids.ID, error)
}
// Issuer manages all the chain transaction flushing.
type Issuer struct {
lock sync.Mutex
log logging.Logger
vms map[[32]byte]issuableVM
locks map[[32]byte]sync.Locker
callbacks chan func()
}
// Initialize this flusher
func (i *Issuer) Initialize(log logging.Logger) {
i.lock.Lock()
defer i.lock.Unlock()
i.log = log
i.vms = make(map[[32]byte]issuableVM)
i.locks = make(map[[32]byte]sync.Locker)
i.callbacks = make(chan func(), 1000)
go func() {
for callback := range i.callbacks {
callback()
}
}()
}
// RegisterChain implements the registrant
func (i *Issuer) RegisterChain(ctx *snow.Context, vm interface{}) {
i.lock.Lock()
defer i.lock.Unlock()
key := ctx.ChainID.Key()
switch vm := vm.(type) {
case issuableVM:
i.vms[key] = vm
i.locks[key] = &ctx.Lock
}
}
// IssueTx issue the transaction to the chain and register the timeout.
func (i *Issuer) IssueTx(chainID ids.ID, tx []byte, finalized func(choices.Status)) {
i.lock.Lock()
defer i.lock.Unlock()
key := chainID.Key()
if lock, exists := i.locks[key]; exists {
i.callbacks <- func() {
lock.Lock()
defer lock.Unlock()
if vm, exists := i.vms[key]; exists {
if _, err := vm.IssueTx(tx, finalized); err != nil {
i.log.Error("Issuing the tx returned with %s unexpectedly", err)
}
}
}
} else {
i.log.Warn("Attempted to issue a Tx to an unsupported chain %s", chainID)
}
}

View File

@ -3,23 +3,16 @@
package node
// #include "salticidae/network.h"
// void onTerm(threadcall_handle_t *, void *);
// void errorHandler(SalticidaeCError *, bool, int32_t, void *);
import "C"
import (
"crypto/tls"
"crypto/x509"
"encoding/pem"
"errors"
"fmt"
"io/ioutil"
"net"
"os"
"path"
"sync"
"unsafe"
"github.com/ava-labs/salticidae-go"
"github.com/ava-labs/gecko/api"
"github.com/ava-labs/gecko/api/admin"
@ -32,14 +25,14 @@ import (
"github.com/ava-labs/gecko/database/prefixdb"
"github.com/ava-labs/gecko/genesis"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/networking"
"github.com/ava-labs/gecko/networking/xputtest"
"github.com/ava-labs/gecko/network"
"github.com/ava-labs/gecko/snow/triggers"
"github.com/ava-labs/gecko/snow/validators"
"github.com/ava-labs/gecko/utils"
"github.com/ava-labs/gecko/utils/hashing"
"github.com/ava-labs/gecko/utils/logging"
"github.com/ava-labs/gecko/utils/wrappers"
"github.com/ava-labs/gecko/version"
"github.com/ava-labs/gecko/vms"
"github.com/ava-labs/gecko/vms/avm"
"github.com/ava-labs/gecko/vms/nftfx"
@ -52,12 +45,16 @@ import (
"github.com/ava-labs/gecko/vms/timestampvm"
)
// Networking constants
const (
maxMessageSize = 1 << 25 // maximum size of a message sent with salticidae
TCP = "tcp"
)
var (
genesisHashKey = []byte("genesisID")
nodeVersion = version.NewDefaultVersion("ava", 0, 1, 0)
versionParser = version.NewDefaultParser()
)
// MainNode is the reference for node callbacks
@ -92,30 +89,12 @@ type Node struct {
DecisionDispatcher *triggers.EventDispatcher
ConsensusDispatcher *triggers.EventDispatcher
// Event loop manager
EC salticidae.EventContext
// Caller to the event context
TCall salticidae.ThreadCall
// Network that manages validator peers
PeerNet salticidae.PeerNetwork
// Network that manages clients
ClientNet salticidae.MsgNetwork // TODO: Remove
// API that handles new connections
ValidatorAPI *networking.Handshake
// API that handles voting messages
ConsensusAPI *networking.Voting
// Net runs the networking stack
Net network.Network
// current validators of the network
vdrs validators.Manager
// APIs that handle client messages
// TODO: Remove
Issuer *xputtest.Issuer
CClientAPI *xputtest.CClient
// Handles HTTP API calls
APIServer api.Server
@ -132,71 +111,37 @@ type Node struct {
******************************************************************************
*/
//export onTerm
func onTerm(*C.threadcall_handle_t, unsafe.Pointer) {
MainNode.Log.Debug("Terminate signal received")
MainNode.EC.Stop()
}
//export errorHandler
func errorHandler(_err *C.struct_SalticidaeCError, fatal C.bool, asyncID C.int32_t, _ unsafe.Pointer) {
err := (*salticidae.Error)(unsafe.Pointer(_err))
if fatal {
MainNode.Log.Fatal("Error during async call: %s", salticidae.StrError(err.GetCode()))
MainNode.EC.Stop()
return
func (n *Node) initNetworking() error {
listener, err := net.Listen(TCP, n.Config.StakingIP.PortString())
if err != nil {
return err
}
MainNode.Log.Debug("Error during async with ID %d call: %s", asyncID, salticidae.StrError(err.GetCode()))
}
func (n *Node) initNetlib() error {
// Create main event context
n.EC = salticidae.NewEventContext()
n.TCall = salticidae.NewThreadCall(n.EC)
n.nodeCloser = utils.HandleSignals(func(os.Signal) {
n.TCall.AsyncCall(salticidae.ThreadCallCallback(C.onTerm), nil)
}, os.Interrupt, os.Kill)
// Create peer network config, may have tls enabled
peerConfig := salticidae.NewPeerNetworkConfig()
peerConfig.ConnTimeout(60)
msgConfig := peerConfig.AsMsgNetworkConfig()
msgConfig.MaxMsgSize(maxMessageSize)
dialer := network.NewDialer(TCP)
var serverUpgrader, clientUpgrader network.Upgrader
if n.Config.EnableStaking {
msgConfig.EnableTLS(true)
msgConfig.TLSKeyFile(n.Config.StakingKeyFile)
msgConfig.TLSCertFile(n.Config.StakingCertFile)
}
// Create the peer network
err := salticidae.NewError()
n.PeerNet = salticidae.NewPeerNetwork(n.EC, peerConfig, &err)
if code := err.GetCode(); code != 0 {
return errors.New(salticidae.StrError(code))
}
// Add peer network error handling
net := n.PeerNet.AsMsgNetwork()
net.RegErrorHandler(salticidae.MsgNetworkErrorCallback(C.errorHandler), nil)
if n.Config.ThroughputServerEnabled {
// Create the client network
msgConfig := salticidae.NewMsgNetworkConfig()
msgConfig.MaxMsgSize(maxMessageSize)
n.ClientNet = salticidae.NewMsgNetwork(n.EC, msgConfig, &err)
if code := err.GetCode(); code != 0 {
return errors.New(salticidae.StrError(code))
// TODO: this TLS config will never accept a connection because the cert pool is empty.
cert, err := tls.LoadX509KeyPair(n.Config.StakingCertFile, n.Config.StakingKeyFile)
if err != nil {
return err
}
// Add client network error handling
n.ClientNet.RegErrorHandler(salticidae.MsgNetworkErrorCallback(C.errorHandler), nil)
certPool := x509.NewCertPool()
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: certPool,
ServerName: "ava",
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: certPool,
}
serverUpgrader = network.NewTLSServerUpgrader(tlsConfig)
clientUpgrader = network.NewTLSClientUpgrader(tlsConfig)
} else {
serverUpgrader = network.NewIPUpgrader()
clientUpgrader = network.NewIPUpgrader()
}
return nil
}
func (n *Node) initValidatorNet() error {
// Initialize validator manager and default subnet's validator set
defaultSubnetValidators := validators.NewSet()
if !n.Config.EnableStaking {
@ -205,104 +150,63 @@ func (n *Node) initValidatorNet() error {
n.vdrs = validators.NewManager()
n.vdrs.PutValidatorSet(platformvm.DefaultSubnetID, defaultSubnetValidators)
cErr := salticidae.NewError()
serverIP := salticidae.NewNetAddrFromIPPortString(n.Config.StakingIP.String(), true, &cErr)
if code := cErr.GetCode(); code != 0 {
return errors.New(salticidae.StrError(code))
n.Net = network.NewDefaultNetwork(
n.Log,
n.ID,
n.Config.StakingIP,
n.Config.NetworkID,
nodeVersion,
versionParser,
listener,
dialer,
serverUpgrader,
clientUpgrader,
defaultSubnetValidators,
n.Config.ConsensusRouter,
)
if !n.Config.EnableStaking {
n.Net.RegisterHandler(&insecureValidatorManager{
vdrs: defaultSubnetValidators,
})
}
n.ValidatorAPI = &networking.HandshakeNet
n.ValidatorAPI.Initialize(
/*log=*/ n.Log,
/*validators=*/ defaultSubnetValidators,
/*myIP=*/ serverIP,
/*myID=*/ n.ID,
/*network=*/ n.PeerNet,
/*metrics=*/ n.Config.ConsensusParams.Metrics,
/*enableStaking=*/ n.Config.EnableStaking,
/*networkID=*/ n.Config.NetworkID,
)
n.nodeCloser = utils.HandleSignals(func(os.Signal) {
n.Net.Close()
}, os.Interrupt, os.Kill)
return nil
}
func (n *Node) initConsensusNet() {
vdrs, ok := n.vdrs.GetValidatorSet(platformvm.DefaultSubnetID)
n.Log.AssertTrue(ok, "should have initialize the validator set already")
n.ConsensusAPI = &networking.VotingNet
n.ConsensusAPI.Initialize(n.Log, vdrs, n.PeerNet, n.ValidatorAPI.Connections(), n.chainManager.Router(), n.Config.ConsensusParams.Metrics)
n.Log.AssertNoError(n.ConsensusDispatcher.Register("gossip", n.ConsensusAPI))
type insecureValidatorManager struct {
vdrs validators.Set
}
func (n *Node) initClients() {
n.Issuer = &xputtest.Issuer{}
n.Issuer.Initialize(n.Log)
n.CClientAPI = &xputtest.CClientHandler
n.CClientAPI.Initialize(n.ClientNet, n.Issuer)
n.chainManager.AddRegistrant(n.Issuer)
func (i *insecureValidatorManager) Connected(vdrID ids.ShortID) bool {
i.vdrs.Add(validators.NewValidator(vdrID, 1))
return false
}
// StartConsensusServer starts the P2P server this node uses to communicate
// with other nodes
func (n *Node) StartConsensusServer() error {
n.Log.Verbo("starting the consensus server")
n.PeerNet.AsMsgNetwork().Start()
err := salticidae.NewError()
// The IP this node listens on for P2P messaging
serverIP := salticidae.NewNetAddrFromIPPortString(n.Config.StakingIP.String(), true, &err)
if code := err.GetCode(); code != 0 {
return fmt.Errorf("failed to create ip addr: %s", salticidae.StrError(code))
}
// Listen for P2P messages
n.PeerNet.Listen(serverIP, &err)
if code := err.GetCode(); code != 0 {
return fmt.Errorf("failed to listen on consensus server at %s: %s", n.Config.StakingIP, salticidae.StrError(code))
}
// Start a server to handle throughput tests if configuration says to. Disabled by default.
if n.Config.ThroughputServerEnabled {
n.ClientNet.Start()
clientIP := salticidae.NewNetAddrFromIPPortString(fmt.Sprintf("127.0.0.1:%d", n.Config.ThroughputPort), true, &err)
if code := err.GetCode(); code != 0 {
return fmt.Errorf("failed to start xput server: %s", salticidae.StrError(code))
}
n.ClientNet.Listen(clientIP, &err)
if code := err.GetCode(); code != 0 {
return fmt.Errorf("failed to listen on xput server at 127.0.0.1:%d: %s", n.Config.ThroughputPort, salticidae.StrError(code))
}
}
func (i *insecureValidatorManager) Disconnected(vdrID ids.ShortID) bool {
i.vdrs.Remove(vdrID)
return false
}
// Dispatch starts the node's servers.
// Returns when the node exits.
func (n *Node) Dispatch() {
// Add bootstrap nodes to the peer network
for _, peer := range n.Config.BootstrapPeers {
if !peer.IP.Equal(n.Config.StakingIP) {
bootstrapAddr := salticidae.NewNetAddrFromIPPortString(peer.IP.String(), true, &err)
if code := err.GetCode(); code != 0 {
return fmt.Errorf("failed to create bootstrap ip addr: %s", salticidae.StrError(code))
}
n.ValidatorAPI.Connect(bootstrapAddr)
n.Net.Track(peer.IP)
} else {
n.Log.Error("can't add self as a bootstrapper")
}
}
return nil
n.Net.Dispatch()
}
// Dispatch starts the node's servers.
// Returns when the node exits.
func (n *Node) Dispatch() { n.EC.Dispatch() }
/*
******************************************************************************
*********************** End P2P Networking Section ***************************
@ -490,7 +394,7 @@ func (n *Node) initAPIServer() {
err := n.APIServer.Dispatch()
n.Log.Fatal("API server initialization failed with %s", err)
n.TCall.AsyncCall(salticidae.ThreadCallCallback(C.onTerm), nil)
n.Net.Close()
})
}
@ -505,12 +409,11 @@ func (n *Node) initChainManager() {
n.ConsensusDispatcher,
n.DB,
n.Config.ConsensusRouter,
&networking.VotingNet,
n.Net,
n.Config.ConsensusParams,
n.vdrs,
n.ID,
n.Config.NetworkID,
n.ValidatorAPI,
&n.APIServer,
&n.keystoreServer,
&n.sharedMemory,
@ -554,7 +457,7 @@ func (n *Node) initMetricsAPI() {
func (n *Node) initAdminAPI() {
if n.Config.AdminAPIEnabled {
n.Log.Info("initializing Admin API")
service := admin.NewService(n.ID, n.Config.NetworkID, n.Log, n.chainManager, n.ValidatorAPI.Connections(), &n.APIServer)
service := admin.NewService(n.ID, n.Config.NetworkID, n.Log, n.chainManager, n.Net, &n.APIServer)
n.APIServer.AddRoute(service, &sync.RWMutex{}, "admin", "", n.HTTPLog)
}
}
@ -624,7 +527,7 @@ func (n *Node) Initialize(Config *Config, logger logging.Logger, logFactory logg
// initialize shared memory
n.initSharedMemory()
if err = n.initNetlib(); err != nil { // Set up all networking
if err = n.initNetworking(); err != nil { // Set up all networking
return fmt.Errorf("problem initializing networking: %w", err)
}
@ -633,22 +536,12 @@ func (n *Node) Initialize(Config *Config, logger logging.Logger, logFactory logg
n.initKeystoreAPI() // Start the Keystore API
n.initMetricsAPI() // Start the Metrics API
// Start node-to-node consensus server
if err := n.initValidatorNet(); err != nil { // Set up the validator handshake + authentication
return fmt.Errorf("problem initializing validator network: %w", err)
}
if err := n.initVMManager(); err != nil { // Set up the vm manager
return fmt.Errorf("problem initializing the VM manager: %w", err)
}
n.initEventDispatcher() // Set up the event dipatcher
n.initChainManager() // Set up the chain manager
n.initConsensusNet() // Set up the main consensus network
// TODO: Remove once API is fully featured for throughput tests
if n.Config.ThroughputServerEnabled {
n.initClients() // Set up the client servers
}
n.initAdminAPI() // Start the Admin API
n.initIPCAPI() // Start the IPC API
@ -662,8 +555,7 @@ func (n *Node) Initialize(Config *Config, logger logging.Logger, logFactory logg
// Shutdown this node
func (n *Node) Shutdown() {
n.Log.Info("shutting down the node")
n.ValidatorAPI.Shutdown()
n.ConsensusAPI.Shutdown()
n.Net.Close()
n.chainManager.Shutdown()
utils.ClearSignals(n.nodeCloser)
}

View File

@ -8,7 +8,6 @@ import (
"crypto/sha256"
"encoding/binary"
"errors"
"fmt"
"io"
"golang.org/x/crypto/ripemd160"
@ -50,7 +49,7 @@ func ByteArraysToHash256Array(byteArray ...[]byte) [32]byte {
for _, b := range byteArray {
err := binary.Write(buffer, binary.LittleEndian, b)
if err != nil {
fmt.Println(err)
panic(err)
}
}
return ComputeHash256Array(buffer.Bytes())

66
version/parser.go Normal file
View File

@ -0,0 +1,66 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package version
import (
"fmt"
"strconv"
"strings"
)
// Parser ...
type Parser interface {
Parse(string) (Version, error)
}
type parser struct {
appSeparator string
versionSeparator string
}
// NewDefaultParser ...
func NewDefaultParser() Parser { return NewParser(defaultAppSeparator, defaultVersionSeparator) }
// NewParser ...
func NewParser(appSeparator string, versionSeparator string) Parser {
return &parser{
appSeparator: appSeparator,
versionSeparator: versionSeparator,
}
}
func (p *parser) Parse(s string) (Version, error) {
splitApp := strings.SplitN(s, p.appSeparator, 2)
if len(splitApp) != 2 {
return nil, fmt.Errorf("failed to parse %s as a version", s)
}
splitVersion := strings.SplitN(splitApp[1], p.versionSeparator, 3)
if len(splitVersion) != 3 {
return nil, fmt.Errorf("failed to parse %s as a version", s)
}
major, err := strconv.Atoi(splitVersion[0])
if err != nil {
return nil, fmt.Errorf("failed to parse %s as a version due to %w", s, err)
}
minor, err := strconv.Atoi(splitVersion[1])
if err != nil {
return nil, fmt.Errorf("failed to parse %s as a version due to %w", s, err)
}
patch, err := strconv.Atoi(splitVersion[2])
if err != nil {
return nil, fmt.Errorf("failed to parse %s as a version due to %w", s, err)
}
return NewVersion(
splitApp[0],
p.appSeparator,
p.versionSeparator,
major,
minor,
patch,
), nil
}

142
version/version.go Normal file
View File

@ -0,0 +1,142 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package version
import (
"errors"
"fmt"
)
const (
defaultAppSeparator = "/"
defaultVersionSeparator = "."
)
var (
errDifferentApps = errors.New("different applications")
errDifferentMajor = errors.New("different major version")
errDifferentMinor = errors.New("different minor version")
)
// Version ...
type Version interface {
fmt.Stringer
App() string
Major() int
Minor() int
Patch() int
Compatible(Version) error
Before(Version) bool
}
type version struct {
app string
major int
minor int
patch int
str string
}
// NewDefaultVersion ...
func NewDefaultVersion(
app string,
major int,
minor int,
patch int,
) Version {
return NewVersion(
app,
defaultAppSeparator,
defaultVersionSeparator,
major,
minor,
patch,
)
}
// NewVersion ...
func NewVersion(
app string,
appSeparator string,
versionSeparator string,
major int,
minor int,
patch int,
) Version {
return &version{
app: app,
major: major,
minor: minor,
patch: patch,
str: fmt.Sprintf("%s%s%d%s%d%s%d",
app,
appSeparator,
major,
versionSeparator,
minor,
versionSeparator,
patch,
),
}
}
func (v *version) App() string { return v.app }
func (v *version) Major() int { return v.major }
func (v *version) Minor() int { return v.minor }
func (v *version) Patch() int { return v.patch }
func (v *version) String() string { return v.str }
func (v *version) Compatible(o Version) error {
switch {
case v.App() != o.App():
return errDifferentApps
case v.Major() != o.Major():
return errDifferentMajor
case v.Minor() != o.Minor():
return errDifferentMinor
default:
return nil
}
}
func (v *version) Before(o Version) bool {
if v.App() != o.App() {
return false
}
{
v := v.Major()
o := o.Major()
switch {
case v < o:
return true
case v > o:
return false
}
}
{
v := v.Minor()
o := o.Minor()
switch {
case v < o:
return true
case v > o:
return false
}
}
{
v := v.Patch()
o := o.Patch()
switch {
case v < o:
return true
}
}
return false
}