Merge pull request #23 from ava-labs/latency

Latency
This commit is contained in:
Stephen Buttolph 2020-04-04 13:42:31 -04:00 committed by GitHub
commit 5363014e0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 330 additions and 202 deletions

View File

@ -0,0 +1,87 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package avalanche
import (
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/utils/logging"
"github.com/ava-labs/gecko/utils/timer"
)
type metrics struct {
numProcessing prometheus.Gauge
latAccepted, latRejected prometheus.Histogram
clock timer.Clock
processing map[[32]byte]time.Time
}
// Initialize implements the Engine interface
func (m *metrics) Initialize(log logging.Logger, namespace string, registerer prometheus.Registerer) error {
m.processing = make(map[[32]byte]time.Time)
m.numProcessing = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "vtx_processing",
Help: "Number of currently processing vertices",
})
m.latAccepted = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Name: "vtx_accepted",
Help: "Latency of accepting from the time the vertex was issued in milliseconds",
Buckets: timer.Buckets,
})
m.latRejected = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Name: "vtx_rejected",
Help: "Latency of rejecting from the time the vertex was issued in milliseconds",
Buckets: timer.Buckets,
})
if err := registerer.Register(m.numProcessing); err != nil {
return fmt.Errorf("Failed to register vtx_processing statistics due to %w", err)
}
if err := registerer.Register(m.latAccepted); err != nil {
return fmt.Errorf("Failed to register vtx_accepted statistics due to %w", err)
}
if err := registerer.Register(m.latRejected); err != nil {
return fmt.Errorf("Failed to register vtx_rejected statistics due to %w", err)
}
return nil
}
func (m *metrics) Issued(id ids.ID) {
m.processing[id.Key()] = m.clock.Time()
m.numProcessing.Inc()
}
func (m *metrics) Accepted(id ids.ID) {
key := id.Key()
start := m.processing[key]
end := m.clock.Time()
delete(m.processing, key)
m.latAccepted.Observe(float64(end.Sub(start).Milliseconds()))
m.numProcessing.Dec()
}
func (m *metrics) Rejected(id ids.ID) {
key := id.Key()
start := m.processing[key]
end := m.clock.Time()
delete(m.processing, key)
m.latRejected.Observe(float64(end.Sub(start).Milliseconds()))
m.numProcessing.Dec()
}

View File

@ -4,8 +4,6 @@
package avalanche
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow"
"github.com/ava-labs/gecko/snow/choices"
@ -28,14 +26,13 @@ func (TopologicalFactory) New() Consensus { return &Topological{} }
// of the voting results. Assumes that vertices are inserted in topological
// order.
type Topological struct {
metrics
// Context used for logging
ctx *snow.Context
// Threshold for confidence increases
params Parameters
numProcessing prometheus.Gauge
numAccepted, numRejected prometheus.Counter
// Maps vtxID -> vtx
nodes map[[32]byte]Vertex
// Tracks the conflict relations
@ -64,33 +61,8 @@ func (ta *Topological) Initialize(ctx *snow.Context, params Parameters, frontier
ta.ctx = ctx
ta.params = params
ta.numProcessing = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: params.Namespace,
Name: "vtx_processing",
Help: "Number of currently processing vertices",
})
ta.numAccepted = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: params.Namespace,
Name: "vtx_accepted",
Help: "Number of vertices accepted",
})
ta.numRejected = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: params.Namespace,
Name: "vtx_rejected",
Help: "Number of vertices rejected",
})
if err := ta.params.Metrics.Register(ta.numProcessing); err != nil {
ta.ctx.Log.Error("Failed to register vtx_processing statistics due to %s", err)
}
if err := ta.params.Metrics.Register(ta.numAccepted); err != nil {
ta.ctx.Log.Error("Failed to register vtx_accepted statistics due to %s", err)
}
if err := ta.params.Metrics.Register(ta.numRejected); err != nil {
ta.ctx.Log.Error("Failed to register vtx_rejected statistics due to %s", err)
if err := ta.metrics.Initialize(ctx.Log, params.Namespace, params.Metrics); err != nil {
ta.ctx.Log.Error("%s", err)
}
ta.nodes = make(map[[32]byte]Vertex)
@ -133,7 +105,7 @@ func (ta *Topological) Add(vtx Vertex) {
}
ta.nodes[key] = vtx // Add this vertex to the set of nodes
ta.numProcessing.Inc()
ta.metrics.Issued(vtxID)
ta.update(vtx) // Update the vertex and it's ancestry
}
@ -367,9 +339,8 @@ func (ta *Topological) update(vtx Vertex) {
for _, dep := range deps {
if status := dep.Status(); status == choices.Rejected {
vtx.Reject() // My parent is rejected, so I should be rejected
ta.numRejected.Inc()
delete(ta.nodes, vtxKey)
ta.numProcessing.Dec()
ta.metrics.Rejected(vtxID)
ta.preferenceCache[vtxKey] = false
ta.virtuousCache[vtxKey] = false
@ -420,18 +391,14 @@ func (ta *Topological) update(vtx Vertex) {
// I'm acceptable, why not accept?
ta.ctx.ConsensusDispatcher.Accept(ta.ctx.ChainID, vtxID, vtx.Bytes())
vtx.Accept()
ta.numAccepted.Inc()
delete(ta.nodes, vtxKey)
ta.numProcessing.Dec()
ta.metrics.Accepted(vtxID)
case rejectable:
// I'm rejectable, why not reject?
vtx.Reject()
ta.ctx.ConsensusDispatcher.Reject(ta.ctx.ChainID, vtxID, vtx.Bytes())
ta.numRejected.Inc()
delete(ta.nodes, vtxKey)
ta.numProcessing.Dec()
ta.metrics.Rejected(vtxID)
}
}

View File

@ -0,0 +1,87 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package snowman
import (
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/utils/logging"
"github.com/ava-labs/gecko/utils/timer"
)
type metrics struct {
numProcessing prometheus.Gauge
latAccepted, latRejected prometheus.Histogram
clock timer.Clock
processing map[[32]byte]time.Time
}
// Initialize implements the Engine interface
func (m *metrics) Initialize(log logging.Logger, namespace string, registerer prometheus.Registerer) error {
m.processing = make(map[[32]byte]time.Time)
m.numProcessing = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "processing",
Help: "Number of currently processing blocks",
})
m.latAccepted = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Name: "accepted",
Help: "Latency of accepting from the time the block was issued in milliseconds",
Buckets: timer.Buckets,
})
m.latRejected = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Name: "rejected",
Help: "Latency of rejecting from the time the block was issued in milliseconds",
Buckets: timer.Buckets,
})
if err := registerer.Register(m.numProcessing); err != nil {
return fmt.Errorf("Failed to register processing statistics due to %w", err)
}
if err := registerer.Register(m.latAccepted); err != nil {
return fmt.Errorf("Failed to register accepted statistics due to %w", err)
}
if err := registerer.Register(m.latRejected); err != nil {
return fmt.Errorf("Failed to register rejected statistics due to %w", err)
}
return nil
}
func (m *metrics) Issued(id ids.ID) {
m.processing[id.Key()] = m.clock.Time()
m.numProcessing.Inc()
}
func (m *metrics) Accepted(id ids.ID) {
key := id.Key()
start := m.processing[key]
end := m.clock.Time()
delete(m.processing, key)
m.latAccepted.Observe(float64(end.Sub(start).Milliseconds()))
m.numProcessing.Dec()
}
func (m *metrics) Rejected(id ids.ID) {
key := id.Key()
start := m.processing[key]
end := m.clock.Time()
delete(m.processing, key)
m.latRejected.Observe(float64(end.Sub(start).Milliseconds()))
m.numProcessing.Dec()
}

View File

@ -4,8 +4,6 @@
package snowman
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow"
"github.com/ava-labs/gecko/snow/consensus/snowball"
@ -21,12 +19,11 @@ func (TopologicalFactory) New() Consensus { return &Topological{} }
// strongly preferred branch. This tree structure amortizes network polls to
// vote on more than just the next position.
type Topological struct {
metrics
ctx *snow.Context
params snowball.Parameters
numProcessing prometheus.Gauge
numAccepted, numRejected prometheus.Counter
head ids.ID
nodes map[[32]byte]node // ParentID -> Snowball instance
tail ids.ID
@ -62,33 +59,8 @@ func (ts *Topological) Initialize(ctx *snow.Context, params snowball.Parameters,
ts.ctx = ctx
ts.params = params
ts.numProcessing = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: params.Namespace,
Name: "processing",
Help: "Number of currently processing blocks",
})
ts.numAccepted = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: params.Namespace,
Name: "accepted",
Help: "Number of blocks accepted",
})
ts.numRejected = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: params.Namespace,
Name: "rejected",
Help: "Number of blocks rejected",
})
if err := ts.params.Metrics.Register(ts.numProcessing); err != nil {
ts.ctx.Log.Error("Failed to register processing statistics due to %s", err)
}
if err := ts.params.Metrics.Register(ts.numAccepted); err != nil {
ts.ctx.Log.Error("Failed to register accepted statistics due to %s", err)
}
if err := ts.params.Metrics.Register(ts.numRejected); err != nil {
ts.ctx.Log.Error("Failed to register rejected statistics due to %s", err)
if err := ts.metrics.Initialize(ctx.Log, params.Namespace, params.Metrics); err != nil {
ts.ctx.Log.Error("%s", err)
}
ts.head = rootID
@ -115,6 +87,7 @@ func (ts *Topological) Add(blk Block) {
bytes := blk.Bytes()
ts.ctx.DecisionDispatcher.Issue(ts.ctx.ChainID, blkID, bytes)
ts.ctx.ConsensusDispatcher.Issue(ts.ctx.ChainID, blkID, bytes)
ts.metrics.Issued(blkID)
if parent, ok := ts.nodes[parentKey]; ok {
parent.Add(blk)
@ -130,8 +103,6 @@ func (ts *Topological) Add(blk Block) {
if ts.tail.Equals(parentID) {
ts.tail = blkID
}
ts.numProcessing.Inc()
} else {
// If the ancestor is missing, this means the ancestor must have already
// been pruned. Therefore, the dependent is transitively rejected.
@ -140,8 +111,7 @@ func (ts *Topological) Add(blk Block) {
bytes := blk.Bytes()
ts.ctx.DecisionDispatcher.Reject(ts.ctx.ChainID, blkID, bytes)
ts.ctx.ConsensusDispatcher.Reject(ts.ctx.ChainID, blkID, bytes)
ts.numRejected.Inc()
ts.metrics.Rejected(blkID)
}
}
@ -319,7 +289,6 @@ func (ts *Topological) vote(voteStack []votes) ids.ID {
ts.accept(parentNode)
tail = parentNode.sb.Preference()
delete(ts.nodes, voteParentKey)
ts.numProcessing.Dec()
} else {
ts.nodes[voteParentKey] = parentNode
}
@ -367,8 +336,8 @@ func (ts *Topological) accept(n node) {
bytes := child.Bytes()
ts.ctx.DecisionDispatcher.Reject(ts.ctx.ChainID, childID, bytes)
ts.ctx.ConsensusDispatcher.Reject(ts.ctx.ChainID, childID, bytes)
ts.metrics.Rejected(childID)
ts.numRejected.Inc()
rejects = append(rejects, childID)
}
}
@ -383,7 +352,7 @@ func (ts *Topological) accept(n node) {
ts.ctx.ConsensusDispatcher.Accept(ts.ctx.ChainID, child.ID(), bytes)
child.Accept()
ts.numAccepted.Inc()
ts.metrics.Accepted(pref)
}
// Takes in a list of newly rejected ids and rejects everything that depends on
@ -397,7 +366,6 @@ func (ts *Topological) rejectTransitively(rejected ...ids.ID) {
rejectKey := rejectID.Key()
rejectNode := ts.nodes[rejectKey]
delete(ts.nodes, rejectKey)
ts.numProcessing.Dec()
for childIDBytes, child := range rejectNode.children {
childID := ids.NewID(childIDBytes)
@ -407,8 +375,7 @@ func (ts *Topological) rejectTransitively(rejected ...ids.ID) {
bytes := child.Bytes()
ts.ctx.DecisionDispatcher.Reject(ts.ctx.ChainID, childID, bytes)
ts.ctx.ConsensusDispatcher.Reject(ts.ctx.ChainID, childID, bytes)
ts.numRejected.Inc()
ts.metrics.Rejected(childID)
}
}
}

View File

@ -9,8 +9,6 @@ import (
"sort"
"strings"
"github.com/prometheus/client_golang/prometheus"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow"
"github.com/ava-labs/gecko/snow/consensus/snowball"
@ -27,12 +25,11 @@ func (DirectedFactory) New() Consensus { return &Directed{} }
// Directed is an implementation of a multi-color, non-transitive, snowball
// instance
type Directed struct {
metrics
ctx *snow.Context
params snowball.Parameters
numProcessingVirtuous, numProcessingRogue prometheus.Gauge
numAccepted, numRejected prometheus.Counter
// Each element of preferences is the ID of a transaction that is preferred.
// That is, each transaction has no out edges
preferences ids.Set
@ -75,42 +72,8 @@ func (dg *Directed) Initialize(ctx *snow.Context, params snowball.Parameters) {
dg.ctx = ctx
dg.params = params
dg.numProcessingVirtuous = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: params.Namespace,
Name: "tx_processing_virtuous",
Help: "Number of processing virtuous transactions",
})
dg.numProcessingRogue = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: params.Namespace,
Name: "tx_processing_rogue",
Help: "Number of processing rogue transactions",
})
dg.numAccepted = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: params.Namespace,
Name: "tx_accepted",
Help: "Number of transactions accepted",
})
dg.numRejected = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: params.Namespace,
Name: "tx_rejected",
Help: "Number of transactions rejected",
})
if err := dg.params.Metrics.Register(dg.numProcessingVirtuous); err != nil {
dg.ctx.Log.Error("Failed to register tx_processing_virtuous statistics due to %s", err)
}
if err := dg.params.Metrics.Register(dg.numProcessingRogue); err != nil {
dg.ctx.Log.Error("Failed to register tx_processing_rogue statistics due to %s", err)
}
if err := dg.params.Metrics.Register(dg.numAccepted); err != nil {
dg.ctx.Log.Error("Failed to register tx_accepted statistics due to %s", err)
}
if err := dg.params.Metrics.Register(dg.numRejected); err != nil {
dg.ctx.Log.Error("Failed to register tx_rejected statistics due to %s", err)
if err := dg.metrics.Initialize(ctx.Log, params.Namespace, params.Metrics); err != nil {
dg.ctx.Log.Error("%s", err)
}
dg.spends = make(map[[32]byte]ids.Set)
@ -169,11 +132,11 @@ func (dg *Directed) Add(tx Tx) {
if inputs.Len() == 0 {
tx.Accept()
dg.ctx.DecisionDispatcher.Accept(dg.ctx.ChainID, txID, bytes)
dg.numAccepted.Inc()
dg.metrics.Issued(txID)
dg.metrics.Accepted(txID)
return
}
id := tx.ID()
fn := &flatNode{tx: tx}
// Note: Below, for readability, we sometimes say "transaction" when we actually mean
@ -194,38 +157,31 @@ func (dg *Directed) Add(tx Tx) {
conflictKey := conflictID.Key()
conflict := dg.nodes[conflictKey]
if !conflict.rogue {
dg.numProcessingVirtuous.Dec()
dg.numProcessingRogue.Inc()
}
dg.virtuous.Remove(conflictID)
dg.virtuousVoting.Remove(conflictID)
conflict.rogue = true
conflict.ins.Add(id)
conflict.ins.Add(txID)
dg.nodes[conflictKey] = conflict
}
// Add Tx to list of transactions consuming UTXO whose ID is id
spends.Add(id)
spends.Add(txID)
dg.spends[inputKey] = spends
}
fn.rogue = fn.outs.Len() != 0 // Mark this transaction as rogue if it has conflicts
// Add the node representing Tx to the node set
dg.nodes[id.Key()] = fn
dg.nodes[txID.Key()] = fn
if !fn.rogue {
// I'm not rogue
dg.virtuous.Add(id)
dg.virtuousVoting.Add(id)
dg.virtuous.Add(txID)
dg.virtuousVoting.Add(txID)
// If I'm not rogue, I must be preferred
dg.preferences.Add(id)
dg.numProcessingVirtuous.Inc()
} else {
dg.numProcessingRogue.Inc()
dg.preferences.Add(txID)
}
dg.metrics.Issued(txID)
// Tx can be accepted only if the transactions it depends on are also accepted
// If any transactions that Tx depends on are rejected, reject Tx
@ -361,12 +317,6 @@ func (dg *Directed) reject(ids ...ids.ID) {
conf := dg.nodes[conflictKey]
delete(dg.nodes, conflictKey)
if conf.rogue {
dg.numProcessingRogue.Dec()
} else {
dg.numProcessingVirtuous.Dec()
}
dg.preferences.Remove(conflict)
// remove the edge between this node and all its neighbors
@ -376,7 +326,8 @@ func (dg *Directed) reject(ids ...ids.ID) {
// Mark it as rejected
conf.tx.Reject()
dg.ctx.DecisionDispatcher.Reject(dg.ctx.ChainID, conf.tx.ID(), conf.tx.Bytes())
dg.numRejected.Inc()
dg.metrics.Rejected(conflict)
dg.pendingAccept.Abandon(conflict)
dg.pendingReject.Fulfill(conflict)
}
@ -466,13 +417,7 @@ func (a *directedAccepter) Update() {
a.fn.accepted = true
a.fn.tx.Accept()
a.dg.ctx.DecisionDispatcher.Accept(a.dg.ctx.ChainID, id, a.fn.tx.Bytes())
a.dg.numAccepted.Inc()
if a.fn.rogue {
a.dg.numProcessingRogue.Dec()
} else {
a.dg.numProcessingVirtuous.Dec()
}
a.dg.metrics.Accepted(id)
a.dg.pendingAccept.Fulfill(id)
a.dg.pendingReject.Abandon(id)

View File

@ -9,8 +9,6 @@ import (
"sort"
"strings"
"github.com/prometheus/client_golang/prometheus"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow"
"github.com/ava-labs/gecko/snow/consensus/snowball"
@ -27,12 +25,11 @@ func (InputFactory) New() Consensus { return &Input{} }
// Input is an implementation of a multi-color, non-transitive, snowball
// instance
type Input struct {
metrics
ctx *snow.Context
params snowball.Parameters
numProcessing prometheus.Gauge
numAccepted, numRejected prometheus.Counter
// preferences is the set of consumerIDs that have only in edges
// virtuous is the set of consumerIDs that have no edges
preferences, virtuous, virtuousVoting ids.Set
@ -70,35 +67,8 @@ func (ig *Input) Initialize(ctx *snow.Context, params snowball.Parameters) {
ig.ctx = ctx
ig.params = params
namespace := fmt.Sprintf("gecko_%s", ig.ctx.ChainID)
ig.numProcessing = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "tx_processing",
Help: "Number of processing transactions",
})
ig.numAccepted = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Name: "tx_accepted",
Help: "Number of transactions accepted",
})
ig.numRejected = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Name: "tx_rejected",
Help: "Number of transactions rejected",
})
if err := ig.params.Metrics.Register(ig.numProcessing); err != nil {
ig.ctx.Log.Error("Failed to register tx_processing statistics due to %s", err)
}
if err := ig.params.Metrics.Register(ig.numAccepted); err != nil {
ig.ctx.Log.Error("Failed to register tx_accepted statistics due to %s", err)
}
if err := ig.params.Metrics.Register(ig.numRejected); err != nil {
ig.ctx.Log.Error("Failed to register tx_rejected statistics due to %s", err)
if err := ig.metrics.Initialize(ctx.Log, params.Namespace, params.Metrics); err != nil {
ig.ctx.Log.Error("%s", err)
}
ig.txs = make(map[[32]byte]txNode)
@ -136,11 +106,11 @@ func (ig *Input) Add(tx Tx) {
if inputs.Len() == 0 {
tx.Accept()
ig.ctx.DecisionDispatcher.Accept(ig.ctx.ChainID, txID, bytes)
ig.numAccepted.Inc()
ig.metrics.Issued(txID)
ig.metrics.Accepted(txID)
return
}
id := tx.ID()
cn := txNode{tx: tx}
virtuous := true
// If there are inputs, they must be voted on
@ -154,26 +124,25 @@ func (ig *Input) Add(tx Tx) {
ig.virtuousVoting.Remove(conflictID)
}
} else {
input.preference = id // If there isn't a conflict, I'm preferred
input.preference = txID // If there isn't a conflict, I'm preferred
}
input.conflicts.Add(id)
input.conflicts.Add(txID)
ig.inputs[consumptionKey] = input
virtuous = virtuous && !exists
}
// Add the node to the set
ig.txs[id.Key()] = cn
ig.txs[txID.Key()] = cn
if virtuous {
// If I'm preferred in all my conflict sets, I'm preferred.
// Because the preference graph is a DAG, there will always be at least
// one preferred consumer, if there is a consumer
ig.preferences.Add(id)
ig.virtuous.Add(id)
ig.virtuousVoting.Add(id)
ig.preferences.Add(txID)
ig.virtuous.Add(txID)
ig.virtuousVoting.Add(txID)
}
ig.numProcessing.Inc()
ig.metrics.Issued(txID)
toReject := &inputRejector{
ig: ig,
@ -321,7 +290,6 @@ func (ig *Input) reject(ids ...ids.ID) {
conflictKey := conflict.Key()
cn := ig.txs[conflictKey]
delete(ig.txs, conflictKey)
ig.numProcessing.Dec()
ig.preferences.Remove(conflict) // A rejected value isn't preferred
// Remove from all conflict sets
@ -330,7 +298,7 @@ func (ig *Input) reject(ids ...ids.ID) {
// Mark it as rejected
cn.tx.Reject()
ig.ctx.DecisionDispatcher.Reject(ig.ctx.ChainID, cn.tx.ID(), cn.tx.Bytes())
ig.numRejected.Inc()
ig.metrics.Rejected(conflict)
ig.pendingAccept.Abandon(conflict)
ig.pendingReject.Fulfill(conflict)
}
@ -517,8 +485,7 @@ func (a *inputAccepter) Update() {
// Mark it as accepted
a.tn.tx.Accept()
a.ig.ctx.DecisionDispatcher.Accept(a.ig.ctx.ChainID, id, a.tn.tx.Bytes())
a.ig.numAccepted.Inc()
a.ig.numProcessing.Dec()
a.ig.metrics.Accepted(id)
a.ig.pendingAccept.Fulfill(id)
a.ig.pendingReject.Abandon(id)

View File

@ -0,0 +1,87 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package snowstorm
import (
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/utils/logging"
"github.com/ava-labs/gecko/utils/timer"
)
type metrics struct {
numProcessing prometheus.Gauge
latAccepted, latRejected prometheus.Histogram
clock timer.Clock
processing map[[32]byte]time.Time
}
// Initialize implements the Engine interface
func (m *metrics) Initialize(log logging.Logger, namespace string, registerer prometheus.Registerer) error {
m.processing = make(map[[32]byte]time.Time)
m.numProcessing = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "tx_processing",
Help: "Number of processing transactions",
})
m.latAccepted = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Name: "tx_accepted",
Help: "Latency of accepting from the time the transaction was issued in milliseconds",
Buckets: timer.Buckets,
})
m.latRejected = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Name: "tx_rejected",
Help: "Latency of rejecting from the time the transaction was issued in milliseconds",
Buckets: timer.Buckets,
})
if err := registerer.Register(m.numProcessing); err != nil {
return fmt.Errorf("Failed to register tx_processing statistics due to %s", err)
}
if err := registerer.Register(m.latAccepted); err != nil {
return fmt.Errorf("Failed to register tx_accepted statistics due to %s", err)
}
if err := registerer.Register(m.latRejected); err != nil {
return fmt.Errorf("Failed to register tx_rejected statistics due to %s", err)
}
return nil
}
func (m *metrics) Issued(id ids.ID) {
m.processing[id.Key()] = m.clock.Time()
m.numProcessing.Inc()
}
func (m *metrics) Accepted(id ids.ID) {
key := id.Key()
start := m.processing[key]
end := m.clock.Time()
delete(m.processing, key)
m.latAccepted.Observe(float64(end.Sub(start).Milliseconds()))
m.numProcessing.Dec()
}
func (m *metrics) Rejected(id ids.ID) {
key := id.Key()
start := m.processing[key]
end := m.clock.Time()
delete(m.processing, key)
m.latRejected.Observe(float64(end.Sub(start).Milliseconds()))
m.numProcessing.Dec()
}

21
utils/timer/latency.go Normal file
View File

@ -0,0 +1,21 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package timer
// Useful latency buckets
var (
Buckets = []float64{
10, // 10 ms is ~ instant
100, // 100 ms
250, // 250 ms
500, // 500 ms
1000, // 1 second
1500, // 1.5 seconds
2000, // 2 seconds
3000, // 3 seconds
5000, // 5 seconds
10000, // 10 seconds
// anything larger than 10 seconds will be bucketed together
}
)