Merge pull request #220 from ava-labs/improve-network-logging

Improve network logging
This commit is contained in:
Stephen Buttolph 2020-06-15 16:00:52 -04:00 committed by GitHub
commit 6abf3a8c02
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 154 additions and 75 deletions

View File

@ -21,6 +21,7 @@ import (
"github.com/ava-labs/gecko/snow/triggers"
"github.com/ava-labs/gecko/snow/validators"
"github.com/ava-labs/gecko/utils"
"github.com/ava-labs/gecko/utils/formatting"
"github.com/ava-labs/gecko/utils/logging"
"github.com/ava-labs/gecko/utils/random"
"github.com/ava-labs/gecko/utils/timer"
@ -278,8 +279,11 @@ func (n *network) GetAcceptedFrontier(validatorIDs ids.ShortSet, chainID ids.ID,
func (n *network) AcceptedFrontier(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
msg, err := n.b.AcceptedFrontier(chainID, requestID, containerIDs)
if err != nil {
n.log.Error("attempted to pack too large of an AcceptedFrontier message.\nNumber of containerIDs: %d",
containerIDs.Len())
n.log.Error("failed to build AcceptedFrontier(%s, %d, %s): %s",
chainID,
requestID,
containerIDs,
err)
return // Packing message failed
}
@ -291,7 +295,11 @@ func (n *network) AcceptedFrontier(validatorID ids.ShortID, chainID ids.ID, requ
sent = peer.send(msg)
}
if !sent {
n.log.Debug("failed to send an AcceptedFrontier message to: %s", validatorID)
n.log.Debug("failed to send AcceptedFrontier(%s, %s, %d, %s)",
validatorID,
chainID,
requestID,
containerIDs)
n.acceptedFrontier.numFailed.Inc()
} else {
n.acceptedFrontier.numSent.Inc()
@ -302,6 +310,11 @@ func (n *network) AcceptedFrontier(validatorID ids.ShortID, chainID ids.ID, requ
func (n *network) GetAccepted(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
msg, err := n.b.GetAccepted(chainID, requestID, containerIDs)
if err != nil {
n.log.Error("failed to build GetAccepted(%s, %d, %s): %s",
chainID,
requestID,
containerIDs,
err)
for _, validatorID := range validatorIDs.List() {
vID := validatorID
n.executor.Add(func() { n.router.GetAcceptedFailed(vID, chainID, requestID) })
@ -319,6 +332,11 @@ func (n *network) GetAccepted(validatorIDs ids.ShortSet, chainID ids.ID, request
sent = peer.send(msg)
}
if !sent {
n.log.Debug("failed to send GetAccepted(%s, %s, %d, %s)",
validatorID,
chainID,
requestID,
containerIDs)
n.executor.Add(func() { n.router.GetAcceptedFailed(vID, chainID, requestID) })
n.getAccepted.numFailed.Inc()
} else {
@ -331,8 +349,11 @@ func (n *network) GetAccepted(validatorIDs ids.ShortSet, chainID ids.ID, request
func (n *network) Accepted(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
msg, err := n.b.Accepted(chainID, requestID, containerIDs)
if err != nil {
n.log.Error("attempted to pack too large of an Accepted message.\nNumber of containerIDs: %d",
containerIDs.Len())
n.log.Error("failed to build Accepted(%s, %d, %s): %s",
chainID,
requestID,
containerIDs,
err)
return // Packing message failed
}
@ -344,33 +365,17 @@ func (n *network) Accepted(validatorID ids.ShortID, chainID ids.ID, requestID ui
sent = peer.send(msg)
}
if !sent {
n.log.Debug("failed to send an Accepted message to: %s", validatorID)
n.log.Debug("failed to send Accepted(%s, %s, %d, %s)",
validatorID,
chainID,
requestID,
containerIDs)
n.accepted.numFailed.Inc()
} else {
n.accepted.numSent.Inc()
}
}
// Get implements the Sender interface.
func (n *network) Get(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID) {
msg, err := n.b.Get(chainID, requestID, containerID)
n.log.AssertNoError(err)
n.stateLock.Lock()
defer n.stateLock.Unlock()
peer, sent := n.peers[validatorID.Key()]
if sent {
sent = peer.send(msg)
}
if !sent {
n.log.Debug("failed to send a Get message to: %s", validatorID)
n.get.numFailed.Inc()
} else {
n.get.numSent.Inc()
}
}
// GetAncestors implements the Sender interface.
func (n *network) GetAncestors(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID) {
msg, err := n.b.GetAncestors(chainID, requestID, containerID)
@ -387,36 +392,18 @@ func (n *network) GetAncestors(validatorID ids.ShortID, chainID ids.ID, requestI
sent = peer.send(msg)
}
if !sent {
n.log.Debug("failed to send GetAncestors(%s, %s, %d, %s)",
validatorID,
chainID,
requestID,
containerID)
n.executor.Add(func() { n.router.GetAncestorsFailed(validatorID, chainID, requestID) })
n.getAncestors.numFailed.Inc()
n.log.Debug("failed to send a GetAncestors message to: %s", validatorID)
} else {
n.getAncestors.numSent.Inc()
}
}
// Put implements the Sender interface.
func (n *network) Put(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) {
msg, err := n.b.Put(chainID, requestID, containerID, container)
if err != nil {
n.log.Error("failed to build Put message because of container of size %d", len(container))
return
}
n.stateLock.Lock()
defer n.stateLock.Unlock()
peer, sent := n.peers[validatorID.Key()]
if sent {
sent = peer.send(msg)
}
if !sent {
n.log.Debug("failed to send a Put message to: %s", validatorID)
n.put.numFailed.Inc()
} else {
n.put.numSent.Inc()
}
}
// MultiPut implements the Sender interface.
func (n *network) MultiPut(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containers [][]byte) {
msg, err := n.b.MultiPut(chainID, requestID, containers)
@ -433,22 +420,90 @@ func (n *network) MultiPut(validatorID ids.ShortID, chainID ids.ID, requestID ui
sent = peer.send(msg)
}
if !sent {
n.log.Debug("failed to send a MultiPut message to: %s", validatorID)
n.log.Debug("failed to send MultiPut(%s, %s, %d, %d)",
validatorID,
chainID,
requestID,
len(containers))
n.multiPut.numFailed.Inc()
} else {
n.multiPut.numSent.Inc()
}
}
// Get implements the Sender interface.
func (n *network) Get(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID) {
msg, err := n.b.Get(chainID, requestID, containerID)
n.log.AssertNoError(err)
n.stateLock.Lock()
defer n.stateLock.Unlock()
peer, sent := n.peers[validatorID.Key()]
if sent {
sent = peer.send(msg)
}
if !sent {
n.log.Debug("failed to send Get(%s, %s, %d, %s)",
validatorID,
chainID,
requestID,
containerID)
n.executor.Add(func() { n.router.GetFailed(validatorID, chainID, requestID) })
n.get.numFailed.Inc()
} else {
n.get.numSent.Inc()
}
}
// Put implements the Sender interface.
func (n *network) Put(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) {
msg, err := n.b.Put(chainID, requestID, containerID, container)
if err != nil {
n.log.Error("failed to build Put(%s, %d, %s): %s. len(container) : %d",
chainID,
requestID,
containerID,
err,
len(container))
return
}
n.stateLock.Lock()
defer n.stateLock.Unlock()
peer, sent := n.peers[validatorID.Key()]
if sent {
sent = peer.send(msg)
}
if !sent {
n.log.Debug("failed to send Put(%s, %s, %d, %s)",
validatorID,
chainID,
requestID,
containerID)
n.log.Verbo("container: %s", formatting.DumpBytes{Bytes: container})
n.put.numFailed.Inc()
} else {
n.put.numSent.Inc()
}
}
// PushQuery implements the Sender interface.
func (n *network) PushQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) {
msg, err := n.b.PushQuery(chainID, requestID, containerID, container)
if err != nil {
n.log.Error("failed to build PushQuery(%s, %d, %s): %s. len(container): %d",
chainID,
requestID,
containerID,
err,
len(container))
n.log.Verbo("container: %s", formatting.DumpBytes{Bytes: container})
for _, validatorID := range validatorIDs.List() {
vID := validatorID
n.executor.Add(func() { n.router.QueryFailed(vID, chainID, requestID) })
}
n.log.Error("attempted to pack too large of a PushQuery message.\nContainer length: %d", len(container))
return // Packing message failed
}
@ -462,7 +517,12 @@ func (n *network) PushQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID
sent = peer.send(msg)
}
if !sent {
n.log.Debug("failed sending a PushQuery message to: %s", vID)
n.log.Debug("failed to send PushQuery(%s, %s, %d, %s)",
validatorID,
chainID,
requestID,
containerID)
n.log.Verbo("container: %s", formatting.DumpBytes{Bytes: container})
n.executor.Add(func() { n.router.QueryFailed(vID, chainID, requestID) })
n.pushQuery.numFailed.Inc()
} else {
@ -486,7 +546,11 @@ func (n *network) PullQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID
sent = peer.send(msg)
}
if !sent {
n.log.Debug("failed sending a PullQuery message to: %s", vID)
n.log.Debug("failed to send PullQuery(%s, %s, %d, %s)",
validatorID,
chainID,
requestID,
containerID)
n.executor.Add(func() { n.router.QueryFailed(vID, chainID, requestID) })
n.pullQuery.numFailed.Inc()
} else {
@ -499,7 +563,11 @@ func (n *network) PullQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID
func (n *network) Chits(validatorID ids.ShortID, chainID ids.ID, requestID uint32, votes ids.Set) {
msg, err := n.b.Chits(chainID, requestID, votes)
if err != nil {
n.log.Error("failed to build Chits message because of %d votes", votes.Len())
n.log.Error("failed to build Chits(%s, %d, %s): %s",
chainID,
requestID,
votes,
err)
return
}
@ -511,7 +579,11 @@ func (n *network) Chits(validatorID ids.ShortID, chainID ids.ID, requestID uint3
sent = peer.send(msg)
}
if !sent {
n.log.Debug("failed to send a Chits message to: %s", validatorID)
n.log.Debug("failed to send Chits(%s, %s, %d, %s)",
validatorID,
chainID,
requestID,
votes)
n.chits.numFailed.Inc()
} else {
n.chits.numSent.Inc()
@ -521,7 +593,8 @@ func (n *network) Chits(validatorID ids.ShortID, chainID ids.ID, requestID uint3
// Gossip attempts to gossip the container to the network
func (n *network) Gossip(chainID, containerID ids.ID, container []byte) {
if err := n.gossipContainer(chainID, containerID, container); err != nil {
n.log.Error("error gossiping container %s to %s: %s", containerID, chainID, err)
n.log.Debug("failed to Gossip(%s, %s): %s", chainID, containerID, err)
n.log.Verbo("container:\n%s", formatting.DumpBytes{Bytes: container})
}
}
@ -695,7 +768,9 @@ func (n *network) gossip() {
}
msg, err := n.b.PeerList(ips)
if err != nil {
n.log.Warn("failed to gossip PeerList message due to %s", err)
n.log.Error("failed to build peer list to gossip: %s. len(ips): %d",
err,
len(ips))
continue
}

View File

@ -7,6 +7,8 @@ import (
"sync"
"time"
"github.com/ava-labs/gecko/utils/formatting"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow/networking/timeout"
"github.com/ava-labs/gecko/utils/logging"
@ -67,7 +69,7 @@ func (sr *ChainRouter) RemoveChain(chainID ids.ID) {
sr.lock.RLock()
chain, exists := sr.chains[chainID.Key()]
if !exists {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
sr.log.Debug("can't remove unknown chain %s", chainID)
sr.lock.RUnlock()
return
}
@ -95,7 +97,7 @@ func (sr *ChainRouter) GetAcceptedFrontier(validatorID ids.ShortID, chainID ids.
if chain, exists := sr.chains[chainID.Key()]; exists {
chain.GetAcceptedFrontier(validatorID, requestID)
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
sr.log.Debug("GetAcceptedFrontier(%s, %s, %d) dropped due to unknown chain", validatorID, chainID, requestID)
}
}
@ -111,7 +113,7 @@ func (sr *ChainRouter) AcceptedFrontier(validatorID ids.ShortID, chainID ids.ID,
sr.timeouts.Cancel(validatorID, chainID, requestID)
}
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
sr.log.Debug("AcceptedFrontier(%s, %s, %d, %s) dropped due to unknown chain", validatorID, chainID, requestID, containerIDs)
}
}
@ -132,7 +134,7 @@ func (sr *ChainRouter) GetAcceptedFrontierFailed(validatorID ids.ShortID, chainI
return
}
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
sr.log.Error("GetAcceptedFrontierFailed(%s, %s, %d) dropped due to unknown chain", validatorID, chainID, requestID)
}
sr.timeouts.Cancel(validatorID, chainID, requestID)
}
@ -147,7 +149,7 @@ func (sr *ChainRouter) GetAccepted(validatorID ids.ShortID, chainID ids.ID, requ
if chain, exists := sr.chains[chainID.Key()]; exists {
chain.GetAccepted(validatorID, requestID, containerIDs)
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
sr.log.Debug("GetAccepted(%s, %s, %d, %s) dropped due to unknown chain", validatorID, chainID, requestID, containerIDs)
}
}
@ -163,7 +165,7 @@ func (sr *ChainRouter) Accepted(validatorID ids.ShortID, chainID ids.ID, request
sr.timeouts.Cancel(validatorID, chainID, requestID)
}
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
sr.log.Debug("Accepted(%s, %s, %d, %s) dropped due to unknown chain", validatorID, chainID, requestID, containerIDs)
}
}
@ -183,7 +185,7 @@ func (sr *ChainRouter) GetAcceptedFailed(validatorID ids.ShortID, chainID ids.ID
return
}
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
sr.log.Error("GetAcceptedFailed(%s, %s, %d) dropped due to unknown chain", validatorID, chainID, requestID)
}
sr.timeouts.Cancel(validatorID, chainID, requestID)
}
@ -198,7 +200,7 @@ func (sr *ChainRouter) GetAncestors(validatorID ids.ShortID, chainID ids.ID, req
if chain, exists := sr.chains[chainID.Key()]; exists {
chain.GetAncestors(validatorID, requestID, containerID)
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
sr.log.Debug("GetAncestors(%s, %s, %d) dropped due to unknown chain", validatorID, chainID, requestID)
}
}
@ -215,7 +217,7 @@ func (sr *ChainRouter) MultiPut(validatorID ids.ShortID, chainID ids.ID, request
sr.timeouts.Cancel(validatorID, chainID, requestID)
}
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
sr.log.Debug("MultiPut(%s, %s, %d, %d) dropped due to unknown chain", validatorID, chainID, requestID, len(containers))
}
}
@ -234,7 +236,7 @@ func (sr *ChainRouter) GetAncestorsFailed(validatorID ids.ShortID, chainID ids.I
return
}
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
sr.log.Error("GetAncestorsFailed(%s, %s, %d, %d) dropped due to unknown chain", validatorID, chainID, requestID)
}
sr.timeouts.Cancel(validatorID, chainID, requestID)
}
@ -248,7 +250,7 @@ func (sr *ChainRouter) Get(validatorID ids.ShortID, chainID ids.ID, requestID ui
if chain, exists := sr.chains[chainID.Key()]; exists {
chain.Get(validatorID, requestID, containerID)
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
sr.log.Debug("Get(%s, %s, %d, %s) dropped due to unknown chain", validatorID, chainID, requestID, containerID)
}
}
@ -265,7 +267,8 @@ func (sr *ChainRouter) Put(validatorID ids.ShortID, chainID ids.ID, requestID ui
sr.timeouts.Cancel(validatorID, chainID, requestID)
}
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
sr.log.Debug("Put(%s, %s, %d, %s) dropped due to unknown chain", validatorID, chainID, requestID, containerID)
sr.log.Verbo("container:\n%s", formatting.DumpBytes{Bytes: container})
}
}
@ -284,7 +287,7 @@ func (sr *ChainRouter) GetFailed(validatorID ids.ShortID, chainID ids.ID, reques
return
}
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
sr.log.Error("GetFailed(%s, %s, %d) dropped due to unknown chain", validatorID, chainID, requestID)
}
sr.timeouts.Cancel(validatorID, chainID, requestID)
}
@ -298,7 +301,8 @@ func (sr *ChainRouter) PushQuery(validatorID ids.ShortID, chainID ids.ID, reques
if chain, exists := sr.chains[chainID.Key()]; exists {
chain.PushQuery(validatorID, requestID, containerID, container)
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
sr.log.Debug("PushQuery(%s, %s, %d, %s) dropped due to unknown chain", validatorID, chainID, requestID, containerID)
sr.log.Verbo("container:\n%s", formatting.DumpBytes{Bytes: container})
}
}
@ -311,7 +315,7 @@ func (sr *ChainRouter) PullQuery(validatorID ids.ShortID, chainID ids.ID, reques
if chain, exists := sr.chains[chainID.Key()]; exists {
chain.PullQuery(validatorID, requestID, containerID)
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
sr.log.Debug("PullQuery(%s, %s, %d, %s) dropped due to unknown chain", validatorID, chainID, requestID, containerID)
}
}
@ -327,7 +331,7 @@ func (sr *ChainRouter) Chits(validatorID ids.ShortID, chainID ids.ID, requestID
sr.timeouts.Cancel(validatorID, chainID, requestID)
}
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
sr.log.Debug("Chits(%s, %s, %d, %s) dropped due to unknown chain", validatorID, chainID, requestID, votes)
}
}
@ -346,7 +350,7 @@ func (sr *ChainRouter) QueryFailed(validatorID ids.ShortID, chainID ids.ID, requ
return
}
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
sr.log.Error("QueryFailed(%s, %s, %d, %s) dropped due to unknown chain", validatorID, chainID, requestID)
}
sr.timeouts.Cancel(validatorID, chainID, requestID)
}