gecko/networking/voting_handlers.go

649 lines
20 KiB
Go

// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package networking
// #include "salticidae/network.h"
// void getAcceptedFrontier(msg_t *, msgnetwork_conn_t *, void *);
// void acceptedFrontier(msg_t *, msgnetwork_conn_t *, void *);
// void getAccepted(msg_t *, msgnetwork_conn_t *, void *);
// void accepted(msg_t *, msgnetwork_conn_t *, void *);
// void get(msg_t *, msgnetwork_conn_t *, void *);
// void put(msg_t *, msgnetwork_conn_t *, void *);
// void pushQuery(msg_t *, msgnetwork_conn_t *, void *);
// void pullQuery(msg_t *, msgnetwork_conn_t *, void *);
// void chits(msg_t *, msgnetwork_conn_t *, void *);
import "C"
import (
"errors"
"fmt"
"unsafe"
"github.com/prometheus/client_golang/prometheus"
"github.com/ava-labs/salticidae-go"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow/networking/router"
"github.com/ava-labs/gecko/snow/validators"
"github.com/ava-labs/gecko/utils/formatting"
"github.com/ava-labs/gecko/utils/logging"
"github.com/ava-labs/gecko/utils/timer"
)
var (
// VotingNet implements the SenderExternal interface.
VotingNet = Voting{}
)
var (
errConnectionDropped = errors.New("connection dropped before receiving message")
)
// Voting implements the SenderExternal interface with a c++ library.
type Voting struct {
votingMetrics
log logging.Logger
vdrs validators.Set
net salticidae.PeerNetwork
conns Connections
router router.Router
executor timer.Executor
}
// Initialize to the c networking library. Should only be called once ever.
func (s *Voting) Initialize(log logging.Logger, vdrs validators.Set, peerNet salticidae.PeerNetwork, conns Connections, router router.Router, registerer prometheus.Registerer) {
log.AssertTrue(s.net == nil, "Should only register network handlers once")
log.AssertTrue(s.conns == nil, "Should only set connections once")
log.AssertTrue(s.router == nil, "Should only set the router once")
s.log = log
s.vdrs = vdrs
s.net = peerNet
s.conns = conns
s.router = router
s.votingMetrics.Initialize(log, registerer)
net := peerNet.AsMsgNetwork()
net.RegHandler(GetAcceptedFrontier, salticidae.MsgNetworkMsgCallback(C.getAcceptedFrontier), nil)
net.RegHandler(AcceptedFrontier, salticidae.MsgNetworkMsgCallback(C.acceptedFrontier), nil)
net.RegHandler(GetAccepted, salticidae.MsgNetworkMsgCallback(C.getAccepted), nil)
net.RegHandler(Accepted, salticidae.MsgNetworkMsgCallback(C.accepted), nil)
net.RegHandler(Get, salticidae.MsgNetworkMsgCallback(C.get), nil)
net.RegHandler(Put, salticidae.MsgNetworkMsgCallback(C.put), nil)
net.RegHandler(PushQuery, salticidae.MsgNetworkMsgCallback(C.pushQuery), nil)
net.RegHandler(PullQuery, salticidae.MsgNetworkMsgCallback(C.pullQuery), nil)
net.RegHandler(Chits, salticidae.MsgNetworkMsgCallback(C.chits), nil)
s.executor.Initialize()
go log.RecoverAndPanic(s.executor.Dispatch)
}
// Shutdown threads
func (s *Voting) Shutdown() { s.executor.Stop() }
// Accept is called after every consensus decision
func (s *Voting) Accept(chainID, containerID ids.ID, container []byte) error {
addrs := []salticidae.NetAddr(nil)
allAddrs, allIDs := s.conns.RawConns()
for i, id := range allIDs {
if !s.vdrs.Contains(id) {
addrs = append(addrs, allAddrs[i])
}
}
build := Builder{}
msg, err := build.Put(chainID, 0, containerID, container)
if err != nil {
return fmt.Errorf("Attempted to pack too large of a Put message.\nContainer length: %d: %w", len(container), err)
}
s.log.Verbo("Sending a Put message to non-validators."+
"\nNumber of Non-Validators: %d"+
"\nChain: %s"+
"\nContainer ID: %s"+
"\nContainer:\n%s",
len(addrs),
chainID,
containerID,
formatting.DumpBytes{Bytes: container},
)
s.send(msg, addrs...)
s.numPutSent.Add(float64(len(addrs)))
return nil
}
// GetAcceptedFrontier implements the Sender interface.
func (s *Voting) GetAcceptedFrontier(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32) {
addrs := []salticidae.NetAddr(nil)
validatorIDList := validatorIDs.List()
for _, validatorID := range validatorIDList {
vID := validatorID
if addr, exists := s.conns.GetIP(vID); exists {
addrs = append(addrs, addr)
s.log.Verbo("Sending a GetAcceptedFrontier to %s", toIPDesc(addr))
} else {
s.log.Debug("Attempted to send a GetAcceptedFrontier message to a disconnected validator: %s", vID)
s.executor.Add(func() { s.router.GetAcceptedFrontierFailed(vID, chainID, requestID) })
}
}
build := Builder{}
msg, err := build.GetAcceptedFrontier(chainID, requestID)
s.log.AssertNoError(err)
s.log.Verbo("Sending a GetAcceptedFrontier message."+
"\nNumber of Validators: %d"+
"\nChain: %s"+
"\nRequest ID: %d",
len(addrs),
chainID,
requestID,
)
s.send(msg, addrs...)
s.numGetAcceptedFrontierSent.Add(float64(len(addrs)))
}
// AcceptedFrontier implements the Sender interface.
func (s *Voting) AcceptedFrontier(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
addr, exists := s.conns.GetIP(validatorID)
if !exists {
s.log.Debug("Attempted to send an AcceptedFrontier message to a disconnected validator: %s", validatorID)
return // Validator is not connected
}
build := Builder{}
msg, err := build.AcceptedFrontier(chainID, requestID, containerIDs)
if err != nil {
s.log.Error("Attempted to pack too large of an AcceptedFrontier message.\nNumber of containerIDs: %d", containerIDs.Len())
return // Packing message failed
}
s.log.Verbo("Sending an AcceptedFrontier message."+
"\nValidator: %s"+
"\nDestination: %s"+
"\nChain: %s"+
"\nRequest ID: %d"+
"\nContainer IDs: %s",
validatorID,
toIPDesc(addr),
chainID,
requestID,
containerIDs,
)
s.send(msg, addr)
s.numAcceptedFrontierSent.Inc()
}
// GetAccepted implements the Sender interface.
func (s *Voting) GetAccepted(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
addrs := []salticidae.NetAddr(nil)
validatorIDList := validatorIDs.List()
for _, validatorID := range validatorIDList {
vID := validatorID
if addr, exists := s.conns.GetIP(validatorID); exists {
addrs = append(addrs, addr)
s.log.Verbo("Sending a GetAccepted to %s", toIPDesc(addr))
} else {
s.log.Debug("Attempted to send a GetAccepted message to a disconnected validator: %s", vID)
s.executor.Add(func() { s.router.GetAcceptedFailed(vID, chainID, requestID) })
}
}
build := Builder{}
msg, err := build.GetAccepted(chainID, requestID, containerIDs)
if err != nil {
for _, addr := range addrs {
if validatorID, exists := s.conns.GetID(addr); exists {
s.executor.Add(func() { s.router.GetAcceptedFailed(validatorID, chainID, requestID) })
}
}
s.log.Debug("Attempted to pack too large of a GetAccepted message.\nNumber of containerIDs: %d", containerIDs.Len())
return // Packing message failed
}
s.log.Verbo("Sending a GetAccepted message."+
"\nNumber of Validators: %d"+
"\nChain: %s"+
"\nRequest ID: %d"+
"\nContainer IDs:%s",
len(addrs),
chainID,
requestID,
containerIDs,
)
s.send(msg, addrs...)
s.numGetAcceptedSent.Add(float64(len(addrs)))
}
// Accepted implements the Sender interface.
func (s *Voting) Accepted(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
addr, exists := s.conns.GetIP(validatorID)
if !exists {
s.log.Debug("Attempted to send an Accepted message to a disconnected validator: %s", validatorID)
return // Validator is not connected
}
build := Builder{}
msg, err := build.Accepted(chainID, requestID, containerIDs)
if err != nil {
s.log.Error("Attempted to pack too large of an Accepted message.\nNumber of containerIDs: %d", containerIDs.Len())
return // Packing message failed
}
s.log.Verbo("Sending an Accepted message."+
"\nValidator: %s"+
"\nDestination: %s"+
"\nChain: %s"+
"\nRequest ID: %d"+
"\nContainer IDs: %s",
validatorID,
toIPDesc(addr),
chainID,
requestID,
containerIDs,
)
s.send(msg, addr)
s.numAcceptedSent.Inc()
}
// Get implements the Sender interface.
func (s *Voting) Get(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID) {
addr, exists := s.conns.GetIP(validatorID)
if !exists {
s.log.Debug("Attempted to send a Get message to a disconnected validator: %s", validatorID)
s.executor.Add(func() { s.router.GetFailed(validatorID, chainID, requestID, containerID) })
return // Validator is not connected
}
build := Builder{}
msg, err := build.Get(chainID, requestID, containerID)
s.log.AssertNoError(err)
s.log.Verbo("Sending a Get message."+
"\nValidator: %s"+
"\nDestination: %s"+
"\nChain: %s"+
"\nRequest ID: %d"+
"\nContainer ID: %s",
validatorID,
toIPDesc(addr),
chainID,
requestID,
containerID,
)
s.send(msg, addr)
s.numGetSent.Inc()
}
// Put implements the Sender interface.
func (s *Voting) Put(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) {
addr, exists := s.conns.GetIP(validatorID)
if !exists {
s.log.Debug("Attempted to send a Container message to a disconnected validator: %s", validatorID)
return // Validator is not connected
}
build := Builder{}
msg, err := build.Put(chainID, requestID, containerID, container)
if err != nil {
s.log.Error("Attempted to pack too large of a Put message.\nContainer length: %d", len(container))
return // Packing message failed
}
s.log.Verbo("Sending a Container message."+
"\nValidator: %s"+
"\nDestination: %s"+
"\nChain: %s"+
"\nRequest ID: %d"+
"\nContainer ID: %s"+
"\nContainer:\n%s",
validatorID,
toIPDesc(addr),
chainID,
requestID,
containerID,
formatting.DumpBytes{Bytes: container},
)
s.send(msg, addr)
s.numPutSent.Inc()
}
// PushQuery implements the Sender interface.
func (s *Voting) PushQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) {
addrs := []salticidae.NetAddr(nil)
validatorIDList := validatorIDs.List()
for _, validatorID := range validatorIDList {
vID := validatorID
if addr, exists := s.conns.GetIP(vID); exists {
addrs = append(addrs, addr)
s.log.Verbo("Sending a PushQuery to %s", toIPDesc(addr))
} else {
s.log.Debug("Attempted to send a PushQuery message to a disconnected validator: %s", vID)
s.executor.Add(func() { s.router.QueryFailed(vID, chainID, requestID) })
}
}
build := Builder{}
msg, err := build.PushQuery(chainID, requestID, containerID, container)
if err != nil {
for _, addr := range addrs {
if validatorID, exists := s.conns.GetID(addr); exists {
s.executor.Add(func() { s.router.QueryFailed(validatorID, chainID, requestID) })
}
}
s.log.Error("Attempted to pack too large of a PushQuery message.\nContainer length: %d", len(container))
return // Packing message failed
}
s.log.Verbo("Sending a PushQuery message."+
"\nNumber of Validators: %d"+
"\nChain: %s"+
"\nRequest ID: %d"+
"\nContainer ID: %s"+
"\nContainer:\n%s",
len(addrs),
chainID,
requestID,
containerID,
formatting.DumpBytes{Bytes: container},
)
s.send(msg, addrs...)
s.numPushQuerySent.Add(float64(len(addrs)))
}
// PullQuery implements the Sender interface.
func (s *Voting) PullQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID) {
addrs := []salticidae.NetAddr(nil)
validatorIDList := validatorIDs.List()
for _, validatorID := range validatorIDList {
vID := validatorID
if addr, exists := s.conns.GetIP(vID); exists {
addrs = append(addrs, addr)
s.log.Verbo("Sending a PushQuery to %s", toIPDesc(addr))
} else {
s.log.Warn("Attempted to send a PushQuery message to a disconnected validator: %s", vID)
s.executor.Add(func() { s.router.QueryFailed(vID, chainID, requestID) })
}
}
build := Builder{}
msg, err := build.PullQuery(chainID, requestID, containerID)
s.log.AssertNoError(err)
s.log.Verbo("Sending a PullQuery message."+
"\nNumber of Validators: %d"+
"\nChain: %s"+
"\nRequest ID: %d"+
"\nContainer ID: %s",
len(addrs),
chainID,
requestID,
containerID,
)
s.send(msg, addrs...)
s.numPullQuerySent.Add(float64(len(addrs)))
}
// Chits implements the Sender interface.
func (s *Voting) Chits(validatorID ids.ShortID, chainID ids.ID, requestID uint32, votes ids.Set) {
addr, exists := s.conns.GetIP(validatorID)
if !exists {
s.log.Debug("Attempted to send a Chits message to a disconnected validator: %s", validatorID)
return // Validator is not connected
}
build := Builder{}
msg, err := build.Chits(chainID, requestID, votes)
if err != nil {
s.log.Error("Attempted to pack too large of a Chits message.\nChits length: %d", votes.Len())
return // Packing message failed
}
s.log.Verbo("Sending a Chits message."+
"\nValidator: %s"+
"\nDestination: %s"+
"\nChain: %s"+
"\nRequest ID: %d"+
"\nNumber of Chits: %d",
validatorID,
toIPDesc(addr),
chainID,
requestID,
votes.Len(),
)
s.send(msg, addr)
s.numChitsSent.Inc()
}
func (s *Voting) send(msg Msg, addrs ...salticidae.NetAddr) {
ds := msg.DataStream()
defer ds.Free()
ba := salticidae.NewByteArrayMovedFromDataStream(ds, false)
defer ba.Free()
cMsg := salticidae.NewMsgMovedFromByteArray(msg.Op(), ba, false)
defer cMsg.Free()
switch len(addrs) {
case 0:
case 1:
s.net.SendMsg(cMsg, addrs[0])
default:
s.net.MulticastMsgByMove(cMsg, addrs)
}
}
// getAcceptedFrontier handles the recept of a getAcceptedFrontier container
// message for a chain
//export getAcceptedFrontier
func getAcceptedFrontier(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
VotingNet.numGetAcceptedFrontierReceived.Inc()
validatorID, chainID, requestID, _, err := VotingNet.sanitize(_msg, _conn, GetAcceptedFrontier)
if err != nil {
VotingNet.log.Error("Failed to sanitize message due to: %s", err)
return
}
VotingNet.router.GetAcceptedFrontier(validatorID, chainID, requestID)
}
// acceptedFrontier handles the recept of an acceptedFrontier message
//export acceptedFrontier
func acceptedFrontier(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
VotingNet.numAcceptedFrontierReceived.Inc()
validatorID, chainID, requestID, msg, err := VotingNet.sanitize(_msg, _conn, AcceptedFrontier)
if err != nil {
VotingNet.log.Error("Failed to sanitize message due to: %s", err)
return
}
containerIDs := ids.Set{}
for _, containerIDBytes := range msg.Get(ContainerIDs).([][]byte) {
containerID, err := ids.ToID(containerIDBytes)
if err != nil {
VotingNet.log.Warn("Error parsing ContainerID: %v", containerIDBytes)
return
}
containerIDs.Add(containerID)
}
VotingNet.router.AcceptedFrontier(validatorID, chainID, requestID, containerIDs)
}
// getAccepted handles the recept of a getAccepted message
//export getAccepted
func getAccepted(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
VotingNet.numGetAcceptedReceived.Inc()
validatorID, chainID, requestID, msg, err := VotingNet.sanitize(_msg, _conn, GetAccepted)
if err != nil {
VotingNet.log.Error("Failed to sanitize message due to: %s", err)
return
}
containerIDs := ids.Set{}
for _, containerIDBytes := range msg.Get(ContainerIDs).([][]byte) {
containerID, err := ids.ToID(containerIDBytes)
if err != nil {
VotingNet.log.Warn("Error parsing ContainerID: %v", containerIDBytes)
return
}
containerIDs.Add(containerID)
}
VotingNet.router.GetAccepted(validatorID, chainID, requestID, containerIDs)
}
// accepted handles the recept of an accepted message
//export accepted
func accepted(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
VotingNet.numAcceptedReceived.Inc()
validatorID, chainID, requestID, msg, err := VotingNet.sanitize(_msg, _conn, Accepted)
if err != nil {
VotingNet.log.Error("Failed to sanitize message due to: %s", err)
return
}
containerIDs := ids.Set{}
for _, containerIDBytes := range msg.Get(ContainerIDs).([][]byte) {
containerID, err := ids.ToID(containerIDBytes)
if err != nil {
VotingNet.log.Warn("Error parsing ContainerID: %v", containerIDBytes)
return
}
containerIDs.Add(containerID)
}
VotingNet.router.Accepted(validatorID, chainID, requestID, containerIDs)
}
// get handles the recept of a get container message for a chain
//export get
func get(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
VotingNet.numGetReceived.Inc()
validatorID, chainID, requestID, msg, err := VotingNet.sanitize(_msg, _conn, Get)
if err != nil {
VotingNet.log.Error("Failed to sanitize message due to: %s", err)
return
}
containerID, _ := ids.ToID(msg.Get(ContainerID).([]byte))
VotingNet.router.Get(validatorID, chainID, requestID, containerID)
}
// put handles the receipt of a container message
//export put
func put(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
VotingNet.numPutReceived.Inc()
validatorID, chainID, requestID, msg, err := VotingNet.sanitize(_msg, _conn, Put)
if err != nil {
VotingNet.log.Error("Failed to sanitize message due to: %s", err)
return
}
containerID, _ := ids.ToID(msg.Get(ContainerID).([]byte))
containerBytes := msg.Get(ContainerBytes).([]byte)
VotingNet.router.Put(validatorID, chainID, requestID, containerID, containerBytes)
}
// pushQuery handles the recept of a pull query message
//export pushQuery
func pushQuery(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
VotingNet.numPushQueryReceived.Inc()
validatorID, chainID, requestID, msg, err := VotingNet.sanitize(_msg, _conn, PushQuery)
if err != nil {
VotingNet.log.Error("Failed to sanitize message due to: %s", err)
return
}
containerID, _ := ids.ToID(msg.Get(ContainerID).([]byte))
containerBytes := msg.Get(ContainerBytes).([]byte)
VotingNet.router.PushQuery(validatorID, chainID, requestID, containerID, containerBytes)
}
// pullQuery handles the recept of a query message
//export pullQuery
func pullQuery(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
VotingNet.numPullQueryReceived.Inc()
validatorID, chainID, requestID, msg, err := VotingNet.sanitize(_msg, _conn, PullQuery)
if err != nil {
VotingNet.log.Error("Failed to sanitize message due to: %s", err)
return
}
containerID, _ := ids.ToID(msg.Get(ContainerID).([]byte))
VotingNet.router.PullQuery(validatorID, chainID, requestID, containerID)
}
// chits handles the recept of a chits message
//export chits
func chits(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
VotingNet.numChitsReceived.Inc()
validatorID, chainID, requestID, msg, err := VotingNet.sanitize(_msg, _conn, Chits)
if err != nil {
VotingNet.log.Error("Failed to sanitize message due to: %s", err)
return
}
votes := ids.Set{}
for _, voteBytes := range msg.Get(ContainerIDs).([][]byte) {
vote, err := ids.ToID(voteBytes)
if err != nil {
VotingNet.log.Warn("Error parsing chit: %v", voteBytes)
return
}
votes.Add(vote)
}
VotingNet.router.Chits(validatorID, chainID, requestID, votes)
}
func (s *Voting) sanitize(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, op salticidae.Opcode) (ids.ShortID, ids.ID, uint32, Msg, error) {
conn := salticidae.PeerNetworkConnFromC(salticidae.CPeerNetworkConn((*C.peernetwork_conn_t)(_conn)))
addr := conn.GetPeerAddr(false)
defer addr.Free()
if addr.IsNull() {
return ids.ShortID{}, ids.ID{}, 0, nil, errConnectionDropped
}
s.log.Verbo("Receiving message from %s", toIPDesc(addr))
validatorID, exists := s.conns.GetID(addr)
if !exists {
return ids.ShortID{}, ids.ID{}, 0, nil, fmt.Errorf("message received from an un-registered source: %s", toIPDesc(addr))
}
msg := salticidae.MsgFromC(salticidae.CMsg(_msg))
codec := Codec{}
pMsg, err := codec.Parse(op, msg.GetPayloadByMove())
if err != nil {
return ids.ShortID{}, ids.ID{}, 0, nil, err // The message couldn't be parsed
}
chainID, err := ids.ToID(pMsg.Get(ChainID).([]byte))
s.log.AssertNoError(err)
requestID := pMsg.Get(RequestID).(uint32)
return validatorID, chainID, requestID, pMsg, nil
}