mirror of https://github.com/poanetwork/gecko.git
649 lines
20 KiB
Go
649 lines
20 KiB
Go
|
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
|
||
|
// See the file LICENSE for licensing terms.
|
||
|
|
||
|
package networking
|
||
|
|
||
|
// #include "salticidae/network.h"
|
||
|
// void getAcceptedFrontier(msg_t *, msgnetwork_conn_t *, void *);
|
||
|
// void acceptedFrontier(msg_t *, msgnetwork_conn_t *, void *);
|
||
|
// void getAccepted(msg_t *, msgnetwork_conn_t *, void *);
|
||
|
// void accepted(msg_t *, msgnetwork_conn_t *, void *);
|
||
|
// void get(msg_t *, msgnetwork_conn_t *, void *);
|
||
|
// void put(msg_t *, msgnetwork_conn_t *, void *);
|
||
|
// void pushQuery(msg_t *, msgnetwork_conn_t *, void *);
|
||
|
// void pullQuery(msg_t *, msgnetwork_conn_t *, void *);
|
||
|
// void chits(msg_t *, msgnetwork_conn_t *, void *);
|
||
|
import "C"
|
||
|
|
||
|
import (
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"unsafe"
|
||
|
|
||
|
"github.com/prometheus/client_golang/prometheus"
|
||
|
|
||
|
"github.com/ava-labs/salticidae-go"
|
||
|
|
||
|
"github.com/ava-labs/gecko/ids"
|
||
|
"github.com/ava-labs/gecko/snow/networking/router"
|
||
|
"github.com/ava-labs/gecko/snow/validators"
|
||
|
"github.com/ava-labs/gecko/utils/formatting"
|
||
|
"github.com/ava-labs/gecko/utils/logging"
|
||
|
"github.com/ava-labs/gecko/utils/timer"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
// VotingNet implements the SenderExternal interface.
|
||
|
VotingNet = Voting{}
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
errConnectionDropped = errors.New("connection dropped before receiving message")
|
||
|
)
|
||
|
|
||
|
// Voting implements the SenderExternal interface with a c++ library.
|
||
|
type Voting struct {
|
||
|
votingMetrics
|
||
|
|
||
|
log logging.Logger
|
||
|
vdrs validators.Set
|
||
|
net salticidae.PeerNetwork
|
||
|
conns Connections
|
||
|
|
||
|
router router.Router
|
||
|
executor timer.Executor
|
||
|
}
|
||
|
|
||
|
// Initialize to the c networking library. Should only be called once ever.
|
||
|
func (s *Voting) Initialize(log logging.Logger, vdrs validators.Set, peerNet salticidae.PeerNetwork, conns Connections, router router.Router, registerer prometheus.Registerer) {
|
||
|
log.AssertTrue(s.net == nil, "Should only register network handlers once")
|
||
|
log.AssertTrue(s.conns == nil, "Should only set connections once")
|
||
|
log.AssertTrue(s.router == nil, "Should only set the router once")
|
||
|
|
||
|
s.log = log
|
||
|
s.vdrs = vdrs
|
||
|
s.net = peerNet
|
||
|
s.conns = conns
|
||
|
s.router = router
|
||
|
|
||
|
s.votingMetrics.Initialize(log, registerer)
|
||
|
|
||
|
net := peerNet.AsMsgNetwork()
|
||
|
|
||
|
net.RegHandler(GetAcceptedFrontier, salticidae.MsgNetworkMsgCallback(C.getAcceptedFrontier), nil)
|
||
|
net.RegHandler(AcceptedFrontier, salticidae.MsgNetworkMsgCallback(C.acceptedFrontier), nil)
|
||
|
net.RegHandler(GetAccepted, salticidae.MsgNetworkMsgCallback(C.getAccepted), nil)
|
||
|
net.RegHandler(Accepted, salticidae.MsgNetworkMsgCallback(C.accepted), nil)
|
||
|
net.RegHandler(Get, salticidae.MsgNetworkMsgCallback(C.get), nil)
|
||
|
net.RegHandler(Put, salticidae.MsgNetworkMsgCallback(C.put), nil)
|
||
|
net.RegHandler(PushQuery, salticidae.MsgNetworkMsgCallback(C.pushQuery), nil)
|
||
|
net.RegHandler(PullQuery, salticidae.MsgNetworkMsgCallback(C.pullQuery), nil)
|
||
|
net.RegHandler(Chits, salticidae.MsgNetworkMsgCallback(C.chits), nil)
|
||
|
|
||
|
s.executor.Initialize()
|
||
|
go log.RecoverAndPanic(s.executor.Dispatch)
|
||
|
}
|
||
|
|
||
|
// Shutdown threads
|
||
|
func (s *Voting) Shutdown() { s.executor.Stop() }
|
||
|
|
||
|
// Accept is called after every consensus decision
|
||
|
func (s *Voting) Accept(chainID, containerID ids.ID, container []byte) error {
|
||
|
addrs := []salticidae.NetAddr(nil)
|
||
|
|
||
|
allAddrs, allIDs := s.conns.RawConns()
|
||
|
for i, id := range allIDs {
|
||
|
if !s.vdrs.Contains(id) {
|
||
|
addrs = append(addrs, allAddrs[i])
|
||
|
}
|
||
|
}
|
||
|
|
||
|
build := Builder{}
|
||
|
msg, err := build.Put(chainID, 0, containerID, container)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("Attempted to pack too large of a Put message.\nContainer length: %d: %w", len(container), err)
|
||
|
}
|
||
|
|
||
|
s.log.Verbo("Sending a Put message to non-validators."+
|
||
|
"\nNumber of Non-Validators: %d"+
|
||
|
"\nChain: %s"+
|
||
|
"\nContainer ID: %s"+
|
||
|
"\nContainer:\n%s",
|
||
|
len(addrs),
|
||
|
chainID,
|
||
|
containerID,
|
||
|
formatting.DumpBytes{Bytes: container},
|
||
|
)
|
||
|
s.send(msg, addrs...)
|
||
|
s.numPutSent.Add(float64(len(addrs)))
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// GetAcceptedFrontier implements the Sender interface.
|
||
|
func (s *Voting) GetAcceptedFrontier(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32) {
|
||
|
addrs := []salticidae.NetAddr(nil)
|
||
|
validatorIDList := validatorIDs.List()
|
||
|
for _, validatorID := range validatorIDList {
|
||
|
vID := validatorID
|
||
|
if addr, exists := s.conns.GetIP(vID); exists {
|
||
|
addrs = append(addrs, addr)
|
||
|
s.log.Verbo("Sending a GetAcceptedFrontier to %s", toIPDesc(addr))
|
||
|
} else {
|
||
|
s.log.Debug("Attempted to send a GetAcceptedFrontier message to a disconnected validator: %s", vID)
|
||
|
s.executor.Add(func() { s.router.GetAcceptedFrontierFailed(vID, chainID, requestID) })
|
||
|
}
|
||
|
}
|
||
|
|
||
|
build := Builder{}
|
||
|
msg, err := build.GetAcceptedFrontier(chainID, requestID)
|
||
|
s.log.AssertNoError(err)
|
||
|
|
||
|
s.log.Verbo("Sending a GetAcceptedFrontier message."+
|
||
|
"\nNumber of Validators: %d"+
|
||
|
"\nChain: %s"+
|
||
|
"\nRequest ID: %d",
|
||
|
len(addrs),
|
||
|
chainID,
|
||
|
requestID,
|
||
|
)
|
||
|
s.send(msg, addrs...)
|
||
|
s.numGetAcceptedFrontierSent.Add(float64(len(addrs)))
|
||
|
}
|
||
|
|
||
|
// AcceptedFrontier implements the Sender interface.
|
||
|
func (s *Voting) AcceptedFrontier(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
|
||
|
addr, exists := s.conns.GetIP(validatorID)
|
||
|
if !exists {
|
||
|
s.log.Debug("Attempted to send an AcceptedFrontier message to a disconnected validator: %s", validatorID)
|
||
|
return // Validator is not connected
|
||
|
}
|
||
|
|
||
|
build := Builder{}
|
||
|
msg, err := build.AcceptedFrontier(chainID, requestID, containerIDs)
|
||
|
if err != nil {
|
||
|
s.log.Error("Attempted to pack too large of an AcceptedFrontier message.\nNumber of containerIDs: %d", containerIDs.Len())
|
||
|
return // Packing message failed
|
||
|
}
|
||
|
|
||
|
s.log.Verbo("Sending an AcceptedFrontier message."+
|
||
|
"\nValidator: %s"+
|
||
|
"\nDestination: %s"+
|
||
|
"\nChain: %s"+
|
||
|
"\nRequest ID: %d"+
|
||
|
"\nContainer IDs: %s",
|
||
|
validatorID,
|
||
|
toIPDesc(addr),
|
||
|
chainID,
|
||
|
requestID,
|
||
|
containerIDs,
|
||
|
)
|
||
|
s.send(msg, addr)
|
||
|
s.numAcceptedFrontierSent.Inc()
|
||
|
}
|
||
|
|
||
|
// GetAccepted implements the Sender interface.
|
||
|
func (s *Voting) GetAccepted(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
|
||
|
addrs := []salticidae.NetAddr(nil)
|
||
|
validatorIDList := validatorIDs.List()
|
||
|
for _, validatorID := range validatorIDList {
|
||
|
vID := validatorID
|
||
|
if addr, exists := s.conns.GetIP(validatorID); exists {
|
||
|
addrs = append(addrs, addr)
|
||
|
s.log.Verbo("Sending a GetAccepted to %s", toIPDesc(addr))
|
||
|
} else {
|
||
|
s.log.Debug("Attempted to send a GetAccepted message to a disconnected validator: %s", vID)
|
||
|
s.executor.Add(func() { s.router.GetAcceptedFailed(vID, chainID, requestID) })
|
||
|
}
|
||
|
}
|
||
|
|
||
|
build := Builder{}
|
||
|
msg, err := build.GetAccepted(chainID, requestID, containerIDs)
|
||
|
if err != nil {
|
||
|
for _, addr := range addrs {
|
||
|
if validatorID, exists := s.conns.GetID(addr); exists {
|
||
|
s.executor.Add(func() { s.router.GetAcceptedFailed(validatorID, chainID, requestID) })
|
||
|
}
|
||
|
}
|
||
|
s.log.Debug("Attempted to pack too large of a GetAccepted message.\nNumber of containerIDs: %d", containerIDs.Len())
|
||
|
return // Packing message failed
|
||
|
}
|
||
|
|
||
|
s.log.Verbo("Sending a GetAccepted message."+
|
||
|
"\nNumber of Validators: %d"+
|
||
|
"\nChain: %s"+
|
||
|
"\nRequest ID: %d"+
|
||
|
"\nContainer IDs:%s",
|
||
|
len(addrs),
|
||
|
chainID,
|
||
|
requestID,
|
||
|
containerIDs,
|
||
|
)
|
||
|
s.send(msg, addrs...)
|
||
|
s.numGetAcceptedSent.Add(float64(len(addrs)))
|
||
|
}
|
||
|
|
||
|
// Accepted implements the Sender interface.
|
||
|
func (s *Voting) Accepted(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
|
||
|
addr, exists := s.conns.GetIP(validatorID)
|
||
|
if !exists {
|
||
|
s.log.Debug("Attempted to send an Accepted message to a disconnected validator: %s", validatorID)
|
||
|
return // Validator is not connected
|
||
|
}
|
||
|
|
||
|
build := Builder{}
|
||
|
msg, err := build.Accepted(chainID, requestID, containerIDs)
|
||
|
if err != nil {
|
||
|
s.log.Error("Attempted to pack too large of an Accepted message.\nNumber of containerIDs: %d", containerIDs.Len())
|
||
|
return // Packing message failed
|
||
|
}
|
||
|
|
||
|
s.log.Verbo("Sending an Accepted message."+
|
||
|
"\nValidator: %s"+
|
||
|
"\nDestination: %s"+
|
||
|
"\nChain: %s"+
|
||
|
"\nRequest ID: %d"+
|
||
|
"\nContainer IDs: %s",
|
||
|
validatorID,
|
||
|
toIPDesc(addr),
|
||
|
chainID,
|
||
|
requestID,
|
||
|
containerIDs,
|
||
|
)
|
||
|
s.send(msg, addr)
|
||
|
s.numAcceptedSent.Inc()
|
||
|
}
|
||
|
|
||
|
// Get implements the Sender interface.
|
||
|
func (s *Voting) Get(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID) {
|
||
|
addr, exists := s.conns.GetIP(validatorID)
|
||
|
if !exists {
|
||
|
s.log.Debug("Attempted to send a Get message to a disconnected validator: %s", validatorID)
|
||
|
s.executor.Add(func() { s.router.GetFailed(validatorID, chainID, requestID, containerID) })
|
||
|
return // Validator is not connected
|
||
|
}
|
||
|
|
||
|
build := Builder{}
|
||
|
msg, err := build.Get(chainID, requestID, containerID)
|
||
|
s.log.AssertNoError(err)
|
||
|
|
||
|
s.log.Verbo("Sending a Get message."+
|
||
|
"\nValidator: %s"+
|
||
|
"\nDestination: %s"+
|
||
|
"\nChain: %s"+
|
||
|
"\nRequest ID: %d"+
|
||
|
"\nContainer ID: %s",
|
||
|
validatorID,
|
||
|
toIPDesc(addr),
|
||
|
chainID,
|
||
|
requestID,
|
||
|
containerID,
|
||
|
)
|
||
|
s.send(msg, addr)
|
||
|
s.numGetSent.Inc()
|
||
|
}
|
||
|
|
||
|
// Put implements the Sender interface.
|
||
|
func (s *Voting) Put(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) {
|
||
|
addr, exists := s.conns.GetIP(validatorID)
|
||
|
if !exists {
|
||
|
s.log.Debug("Attempted to send a Container message to a disconnected validator: %s", validatorID)
|
||
|
return // Validator is not connected
|
||
|
}
|
||
|
|
||
|
build := Builder{}
|
||
|
msg, err := build.Put(chainID, requestID, containerID, container)
|
||
|
if err != nil {
|
||
|
s.log.Error("Attempted to pack too large of a Put message.\nContainer length: %d", len(container))
|
||
|
return // Packing message failed
|
||
|
}
|
||
|
|
||
|
s.log.Verbo("Sending a Container message."+
|
||
|
"\nValidator: %s"+
|
||
|
"\nDestination: %s"+
|
||
|
"\nChain: %s"+
|
||
|
"\nRequest ID: %d"+
|
||
|
"\nContainer ID: %s"+
|
||
|
"\nContainer:\n%s",
|
||
|
validatorID,
|
||
|
toIPDesc(addr),
|
||
|
chainID,
|
||
|
requestID,
|
||
|
containerID,
|
||
|
formatting.DumpBytes{Bytes: container},
|
||
|
)
|
||
|
s.send(msg, addr)
|
||
|
s.numPutSent.Inc()
|
||
|
}
|
||
|
|
||
|
// PushQuery implements the Sender interface.
|
||
|
func (s *Voting) PushQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) {
|
||
|
addrs := []salticidae.NetAddr(nil)
|
||
|
validatorIDList := validatorIDs.List()
|
||
|
for _, validatorID := range validatorIDList {
|
||
|
vID := validatorID
|
||
|
if addr, exists := s.conns.GetIP(vID); exists {
|
||
|
addrs = append(addrs, addr)
|
||
|
s.log.Verbo("Sending a PushQuery to %s", toIPDesc(addr))
|
||
|
} else {
|
||
|
s.log.Debug("Attempted to send a PushQuery message to a disconnected validator: %s", vID)
|
||
|
s.executor.Add(func() { s.router.QueryFailed(vID, chainID, requestID) })
|
||
|
}
|
||
|
}
|
||
|
|
||
|
build := Builder{}
|
||
|
msg, err := build.PushQuery(chainID, requestID, containerID, container)
|
||
|
if err != nil {
|
||
|
for _, addr := range addrs {
|
||
|
if validatorID, exists := s.conns.GetID(addr); exists {
|
||
|
s.executor.Add(func() { s.router.QueryFailed(validatorID, chainID, requestID) })
|
||
|
}
|
||
|
}
|
||
|
s.log.Error("Attempted to pack too large of a PushQuery message.\nContainer length: %d", len(container))
|
||
|
return // Packing message failed
|
||
|
}
|
||
|
|
||
|
s.log.Verbo("Sending a PushQuery message."+
|
||
|
"\nNumber of Validators: %d"+
|
||
|
"\nChain: %s"+
|
||
|
"\nRequest ID: %d"+
|
||
|
"\nContainer ID: %s"+
|
||
|
"\nContainer:\n%s",
|
||
|
len(addrs),
|
||
|
chainID,
|
||
|
requestID,
|
||
|
containerID,
|
||
|
formatting.DumpBytes{Bytes: container},
|
||
|
)
|
||
|
s.send(msg, addrs...)
|
||
|
s.numPushQuerySent.Add(float64(len(addrs)))
|
||
|
}
|
||
|
|
||
|
// PullQuery implements the Sender interface.
|
||
|
func (s *Voting) PullQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID) {
|
||
|
addrs := []salticidae.NetAddr(nil)
|
||
|
validatorIDList := validatorIDs.List()
|
||
|
for _, validatorID := range validatorIDList {
|
||
|
vID := validatorID
|
||
|
if addr, exists := s.conns.GetIP(vID); exists {
|
||
|
addrs = append(addrs, addr)
|
||
|
s.log.Verbo("Sending a PushQuery to %s", toIPDesc(addr))
|
||
|
} else {
|
||
|
s.log.Warn("Attempted to send a PushQuery message to a disconnected validator: %s", vID)
|
||
|
s.executor.Add(func() { s.router.QueryFailed(vID, chainID, requestID) })
|
||
|
}
|
||
|
}
|
||
|
|
||
|
build := Builder{}
|
||
|
msg, err := build.PullQuery(chainID, requestID, containerID)
|
||
|
s.log.AssertNoError(err)
|
||
|
|
||
|
s.log.Verbo("Sending a PullQuery message."+
|
||
|
"\nNumber of Validators: %d"+
|
||
|
"\nChain: %s"+
|
||
|
"\nRequest ID: %d"+
|
||
|
"\nContainer ID: %s",
|
||
|
len(addrs),
|
||
|
chainID,
|
||
|
requestID,
|
||
|
containerID,
|
||
|
)
|
||
|
s.send(msg, addrs...)
|
||
|
s.numPullQuerySent.Add(float64(len(addrs)))
|
||
|
}
|
||
|
|
||
|
// Chits implements the Sender interface.
|
||
|
func (s *Voting) Chits(validatorID ids.ShortID, chainID ids.ID, requestID uint32, votes ids.Set) {
|
||
|
addr, exists := s.conns.GetIP(validatorID)
|
||
|
if !exists {
|
||
|
s.log.Debug("Attempted to send a Chits message to a disconnected validator: %s", validatorID)
|
||
|
return // Validator is not connected
|
||
|
}
|
||
|
|
||
|
build := Builder{}
|
||
|
msg, err := build.Chits(chainID, requestID, votes)
|
||
|
if err != nil {
|
||
|
s.log.Error("Attempted to pack too large of a Chits message.\nChits length: %d", votes.Len())
|
||
|
return // Packing message failed
|
||
|
}
|
||
|
|
||
|
s.log.Verbo("Sending a Chits message."+
|
||
|
"\nValidator: %s"+
|
||
|
"\nDestination: %s"+
|
||
|
"\nChain: %s"+
|
||
|
"\nRequest ID: %d"+
|
||
|
"\nNumber of Chits: %d",
|
||
|
validatorID,
|
||
|
toIPDesc(addr),
|
||
|
chainID,
|
||
|
requestID,
|
||
|
votes.Len(),
|
||
|
)
|
||
|
s.send(msg, addr)
|
||
|
s.numChitsSent.Inc()
|
||
|
}
|
||
|
|
||
|
func (s *Voting) send(msg Msg, addrs ...salticidae.NetAddr) {
|
||
|
ds := msg.DataStream()
|
||
|
defer ds.Free()
|
||
|
ba := salticidae.NewByteArrayMovedFromDataStream(ds, false)
|
||
|
defer ba.Free()
|
||
|
cMsg := salticidae.NewMsgMovedFromByteArray(msg.Op(), ba, false)
|
||
|
defer cMsg.Free()
|
||
|
|
||
|
switch len(addrs) {
|
||
|
case 0:
|
||
|
case 1:
|
||
|
s.net.SendMsg(cMsg, addrs[0])
|
||
|
default:
|
||
|
s.net.MulticastMsgByMove(cMsg, addrs)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// getAcceptedFrontier handles the recept of a getAcceptedFrontier container
|
||
|
// message for a chain
|
||
|
//export getAcceptedFrontier
|
||
|
func getAcceptedFrontier(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
|
||
|
VotingNet.numGetAcceptedFrontierReceived.Inc()
|
||
|
|
||
|
validatorID, chainID, requestID, _, err := VotingNet.sanitize(_msg, _conn, GetAcceptedFrontier)
|
||
|
if err != nil {
|
||
|
VotingNet.log.Error("Failed to sanitize message due to: %s", err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
VotingNet.router.GetAcceptedFrontier(validatorID, chainID, requestID)
|
||
|
}
|
||
|
|
||
|
// acceptedFrontier handles the recept of an acceptedFrontier message
|
||
|
//export acceptedFrontier
|
||
|
func acceptedFrontier(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
|
||
|
VotingNet.numAcceptedFrontierReceived.Inc()
|
||
|
|
||
|
validatorID, chainID, requestID, msg, err := VotingNet.sanitize(_msg, _conn, AcceptedFrontier)
|
||
|
if err != nil {
|
||
|
VotingNet.log.Error("Failed to sanitize message due to: %s", err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
containerIDs := ids.Set{}
|
||
|
for _, containerIDBytes := range msg.Get(ContainerIDs).([][]byte) {
|
||
|
containerID, err := ids.ToID(containerIDBytes)
|
||
|
if err != nil {
|
||
|
VotingNet.log.Warn("Error parsing ContainerID: %v", containerIDBytes)
|
||
|
return
|
||
|
}
|
||
|
containerIDs.Add(containerID)
|
||
|
}
|
||
|
|
||
|
VotingNet.router.AcceptedFrontier(validatorID, chainID, requestID, containerIDs)
|
||
|
}
|
||
|
|
||
|
// getAccepted handles the recept of a getAccepted message
|
||
|
//export getAccepted
|
||
|
func getAccepted(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
|
||
|
VotingNet.numGetAcceptedReceived.Inc()
|
||
|
|
||
|
validatorID, chainID, requestID, msg, err := VotingNet.sanitize(_msg, _conn, GetAccepted)
|
||
|
if err != nil {
|
||
|
VotingNet.log.Error("Failed to sanitize message due to: %s", err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
containerIDs := ids.Set{}
|
||
|
for _, containerIDBytes := range msg.Get(ContainerIDs).([][]byte) {
|
||
|
containerID, err := ids.ToID(containerIDBytes)
|
||
|
if err != nil {
|
||
|
VotingNet.log.Warn("Error parsing ContainerID: %v", containerIDBytes)
|
||
|
return
|
||
|
}
|
||
|
containerIDs.Add(containerID)
|
||
|
}
|
||
|
|
||
|
VotingNet.router.GetAccepted(validatorID, chainID, requestID, containerIDs)
|
||
|
}
|
||
|
|
||
|
// accepted handles the recept of an accepted message
|
||
|
//export accepted
|
||
|
func accepted(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
|
||
|
VotingNet.numAcceptedReceived.Inc()
|
||
|
|
||
|
validatorID, chainID, requestID, msg, err := VotingNet.sanitize(_msg, _conn, Accepted)
|
||
|
if err != nil {
|
||
|
VotingNet.log.Error("Failed to sanitize message due to: %s", err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
containerIDs := ids.Set{}
|
||
|
for _, containerIDBytes := range msg.Get(ContainerIDs).([][]byte) {
|
||
|
containerID, err := ids.ToID(containerIDBytes)
|
||
|
if err != nil {
|
||
|
VotingNet.log.Warn("Error parsing ContainerID: %v", containerIDBytes)
|
||
|
return
|
||
|
}
|
||
|
containerIDs.Add(containerID)
|
||
|
}
|
||
|
|
||
|
VotingNet.router.Accepted(validatorID, chainID, requestID, containerIDs)
|
||
|
}
|
||
|
|
||
|
// get handles the recept of a get container message for a chain
|
||
|
//export get
|
||
|
func get(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
|
||
|
VotingNet.numGetReceived.Inc()
|
||
|
|
||
|
validatorID, chainID, requestID, msg, err := VotingNet.sanitize(_msg, _conn, Get)
|
||
|
if err != nil {
|
||
|
VotingNet.log.Error("Failed to sanitize message due to: %s", err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
containerID, _ := ids.ToID(msg.Get(ContainerID).([]byte))
|
||
|
|
||
|
VotingNet.router.Get(validatorID, chainID, requestID, containerID)
|
||
|
}
|
||
|
|
||
|
// put handles the receipt of a container message
|
||
|
//export put
|
||
|
func put(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
|
||
|
VotingNet.numPutReceived.Inc()
|
||
|
|
||
|
validatorID, chainID, requestID, msg, err := VotingNet.sanitize(_msg, _conn, Put)
|
||
|
if err != nil {
|
||
|
VotingNet.log.Error("Failed to sanitize message due to: %s", err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
containerID, _ := ids.ToID(msg.Get(ContainerID).([]byte))
|
||
|
|
||
|
containerBytes := msg.Get(ContainerBytes).([]byte)
|
||
|
|
||
|
VotingNet.router.Put(validatorID, chainID, requestID, containerID, containerBytes)
|
||
|
}
|
||
|
|
||
|
// pushQuery handles the recept of a pull query message
|
||
|
//export pushQuery
|
||
|
func pushQuery(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
|
||
|
VotingNet.numPushQueryReceived.Inc()
|
||
|
|
||
|
validatorID, chainID, requestID, msg, err := VotingNet.sanitize(_msg, _conn, PushQuery)
|
||
|
if err != nil {
|
||
|
VotingNet.log.Error("Failed to sanitize message due to: %s", err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
containerID, _ := ids.ToID(msg.Get(ContainerID).([]byte))
|
||
|
|
||
|
containerBytes := msg.Get(ContainerBytes).([]byte)
|
||
|
|
||
|
VotingNet.router.PushQuery(validatorID, chainID, requestID, containerID, containerBytes)
|
||
|
}
|
||
|
|
||
|
// pullQuery handles the recept of a query message
|
||
|
//export pullQuery
|
||
|
func pullQuery(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
|
||
|
VotingNet.numPullQueryReceived.Inc()
|
||
|
|
||
|
validatorID, chainID, requestID, msg, err := VotingNet.sanitize(_msg, _conn, PullQuery)
|
||
|
if err != nil {
|
||
|
VotingNet.log.Error("Failed to sanitize message due to: %s", err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
containerID, _ := ids.ToID(msg.Get(ContainerID).([]byte))
|
||
|
|
||
|
VotingNet.router.PullQuery(validatorID, chainID, requestID, containerID)
|
||
|
}
|
||
|
|
||
|
// chits handles the recept of a chits message
|
||
|
//export chits
|
||
|
func chits(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
|
||
|
VotingNet.numChitsReceived.Inc()
|
||
|
|
||
|
validatorID, chainID, requestID, msg, err := VotingNet.sanitize(_msg, _conn, Chits)
|
||
|
if err != nil {
|
||
|
VotingNet.log.Error("Failed to sanitize message due to: %s", err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
votes := ids.Set{}
|
||
|
for _, voteBytes := range msg.Get(ContainerIDs).([][]byte) {
|
||
|
vote, err := ids.ToID(voteBytes)
|
||
|
if err != nil {
|
||
|
VotingNet.log.Warn("Error parsing chit: %v", voteBytes)
|
||
|
return
|
||
|
}
|
||
|
votes.Add(vote)
|
||
|
}
|
||
|
|
||
|
VotingNet.router.Chits(validatorID, chainID, requestID, votes)
|
||
|
}
|
||
|
|
||
|
func (s *Voting) sanitize(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, op salticidae.Opcode) (ids.ShortID, ids.ID, uint32, Msg, error) {
|
||
|
conn := salticidae.PeerNetworkConnFromC(salticidae.CPeerNetworkConn((*C.peernetwork_conn_t)(_conn)))
|
||
|
addr := conn.GetPeerAddr(false)
|
||
|
defer addr.Free()
|
||
|
if addr.IsNull() {
|
||
|
return ids.ShortID{}, ids.ID{}, 0, nil, errConnectionDropped
|
||
|
}
|
||
|
s.log.Verbo("Receiving message from %s", toIPDesc(addr))
|
||
|
|
||
|
validatorID, exists := s.conns.GetID(addr)
|
||
|
if !exists {
|
||
|
return ids.ShortID{}, ids.ID{}, 0, nil, fmt.Errorf("message received from an un-registered source: %s", toIPDesc(addr))
|
||
|
}
|
||
|
|
||
|
msg := salticidae.MsgFromC(salticidae.CMsg(_msg))
|
||
|
codec := Codec{}
|
||
|
pMsg, err := codec.Parse(op, msg.GetPayloadByMove())
|
||
|
if err != nil {
|
||
|
return ids.ShortID{}, ids.ID{}, 0, nil, err // The message couldn't be parsed
|
||
|
}
|
||
|
|
||
|
chainID, err := ids.ToID(pMsg.Get(ChainID).([]byte))
|
||
|
s.log.AssertNoError(err)
|
||
|
|
||
|
requestID := pMsg.Get(RequestID).(uint32)
|
||
|
|
||
|
return validatorID, chainID, requestID, pMsg, nil
|
||
|
}
|