metrics: add snapshot meter by duration

This commit is contained in:
Miya Chen 2017-10-11 14:32:49 +08:00
parent e3db195507
commit d91a0b7e69
2 changed files with 92 additions and 46 deletions

View File

@ -18,33 +18,33 @@ package metrics
import (
"context"
"fmt"
"math/big"
"sync"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/rcrowley/go-metrics"
"github.com/getamis/istanbul-tools/client"
"github.com/getamis/istanbul-tools/container"
)
type StopSnapshot func()
type metricsManager struct {
registry *DefaultRegistry
SentTxCounter metrics.Counter
TxErrCounter metrics.Counter
ReqMeter metrics.Meter
ExcutedTxCounter metrics.Counter
RespMeter metrics.Meter
UnknownTxCounter metrics.Counter
TxLatencyTimer metrics.Timer
BlockPeriodTimer metrics.Timer
BlockLatencyTimer metrics.Timer
TPSBlockHistogram metrics.Histogram
SentTxCounter *Counter
TxErrCounter *Counter
ExcutedTxCounter *Counter
UnknownTxCounter *Counter
ReqMeter *Meter
RespMeter *Meter
TxLatencyTimer *Timer
BlockPeriodTimer *Timer
BlockLatencyTimer *Timer
}
func newMetricsManager() *metricsManager {
@ -57,7 +57,6 @@ func newMetricsManager() *metricsManager {
UnknownTxCounter: r.NewCounter("tx/unknown"),
ReqMeter: r.NewMeter("tx/rps"),
RespMeter: r.NewMeter("tx/tps/response"),
TPSBlockHistogram: r.NewHistogram("tx/tps/block"),
TxLatencyTimer: r.NewTimer("tx/latency"),
BlockPeriodTimer: r.NewTimer("block/period"),
BlockLatencyTimer: r.NewTimer("block/latency"),
@ -68,6 +67,33 @@ func (m *metricsManager) Export() {
m.registry.Export()
}
func (m *metricsManager) SnapshotMeter(meters []*Meter, d time.Duration) StopSnapshot {
stop := make(chan struct{})
stopFn := func() {
close(stop)
}
go func() {
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
select {
case <-ticker.C:
for _, metric := range meters {
snapshot := metric.Snapshot()
his := m.registry.NewHistogram(fmt.Sprintf("%s/histogram", metric.Name()))
his.Update(int64(snapshot.Rate1()))
}
case <-stop:
return
}
}
}()
return stopFn
}
// --------------------------------------------------------------------------------------------------
type metricChain struct {
container.Blockchain
@ -77,7 +103,8 @@ type metricChain struct {
txStartCh chan *txInfo
txDoneCh chan *txInfo
metricsMgr *metricsManager
metricsMgr *metricsManager
stopSnapshot StopSnapshot
wg sync.WaitGroup
quit chan struct{}
@ -89,7 +116,7 @@ func NewMetricChain(blockchain container.Blockchain) container.Blockchain {
}
mc := &metricChain{
Blockchain: blockchain,
headCh: make(chan *ethtypes.Header),
headCh: make(chan *ethtypes.Header, 1000),
txStartCh: make(chan *txInfo, 10000),
txDoneCh: make(chan *txInfo, 10000),
quit: make(chan struct{}),
@ -129,6 +156,8 @@ func (mc *metricChain) Start(strong bool) error {
}
mc.headSubs = append(mc.headSubs, sub)
}
snapshotMeters := []*Meter{mc.metricsMgr.ReqMeter, mc.metricsMgr.RespMeter}
mc.stopSnapshot = mc.metricsMgr.SnapshotMeter(snapshotMeters, 1*time.Minute)
mc.wg.Add(2)
go mc.handleNewHeadEvent()
@ -142,6 +171,7 @@ func (mc *metricChain) Stop(strong bool) error {
sub.Unsubscribe()
}
mc.wg.Wait()
mc.stopSnapshot()
mc.Export()
return mc.Blockchain.Stop(strong)
}
@ -199,14 +229,6 @@ func (mc *metricChain) handleNewHeadEvent() {
mc.metricsMgr.BlockLatencyTimer.Update(now.Sub(preBlockTime))
preBlockTime = now
// check counts of txs
if header.TxHash == ethtypes.DeriveSha(ethtypes.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*ethtypes.Header{}) {
mc.metricsMgr.ExcutedTxCounter.Inc(0)
mc.metricsMgr.RespMeter.Mark(0)
mc.metricsMgr.TPSBlockHistogram.Update(0)
return
}
// get block
blockCh := make(chan *ethtypes.Block, len(mc.eths))
ctx, cancel := context.WithCancel(context.Background())
@ -228,9 +250,6 @@ func (mc *metricChain) handleNewHeadEvent() {
mc.metricsMgr.ExcutedTxCounter.Inc(int64(len(headBlock.Transactions())))
mc.metricsMgr.RespMeter.Mark(int64(len(headBlock.Transactions())))
if blockPeriod > 0 {
mc.metricsMgr.TPSBlockHistogram.Update(int64(len(headBlock.Transactions())) / blockPeriod)
}
// update tx info
for _, tx := range headBlock.Transactions() {

View File

@ -23,37 +23,36 @@ import (
)
type DefaultRegistry struct {
registry metrics.Registry
metrics.Registry
}
func NewRegistry() *DefaultRegistry {
r := metrics.NewRegistry()
return &DefaultRegistry{registry: r}
return &DefaultRegistry{r}
}
func (r *DefaultRegistry) NewCounter(name string) metrics.Counter {
return metrics.GetOrRegisterCounter(name, r.registry)
func (r *DefaultRegistry) NewCounter(name string) *Counter {
return &Counter{metrics.GetOrRegisterCounter(name, r.Registry), name}
}
func (r *DefaultRegistry) NewMeter(name string) metrics.Meter {
return metrics.GetOrRegisterMeter(name, r.registry)
func (r *DefaultRegistry) NewMeter(name string) *Meter {
return &Meter{metrics.GetOrRegisterMeter(name, r.Registry), name}
}
func (r *DefaultRegistry) NewTimer(name string) metrics.Timer {
return metrics.GetOrRegisterTimer(name, r.registry)
func (r *DefaultRegistry) NewTimer(name string) *Timer {
return &Timer{metrics.GetOrRegisterTimer(name, r.Registry), name}
}
func (r *DefaultRegistry) NewHistogram(name string) metrics.Histogram {
return metrics.GetOrRegisterHistogram(name, r.registry, metrics.NewExpDecaySample(1028, 0.015))
func (r *DefaultRegistry) NewHistogram(name string) *Histogram {
return &Histogram{metrics.GetOrRegisterHistogram(name, r.Registry, metrics.NewExpDecaySample(1028, 0.015)), name}
}
// -----------------------------------------------------------------
func (r *DefaultRegistry) Export() {
r.export()
}
func (r *DefaultRegistry) export() {
r.registry.Each(func(name string, i interface{}) {
r.Registry.Each(func(name string, i interface{}) {
switch metric := i.(type) {
case metrics.Counter:
fmt.Printf("counter %s\n", name)
@ -70,7 +69,7 @@ func (r *DefaultRegistry) export() {
fmt.Printf(" error: %v\n", metric.Error())
case metrics.Histogram:
h := metric.Snapshot()
ps := h.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
ps := h.Percentiles([]float64{0.5, 0.75, 0.90, 0.95, 0.99})
fmt.Printf("histogram %s\n", name)
fmt.Printf(" count: %9d\n", h.Count())
fmt.Printf(" min: %9d\n", h.Min())
@ -79,9 +78,9 @@ func (r *DefaultRegistry) export() {
fmt.Printf(" stddev: %e\n", h.StdDev())
fmt.Printf(" median: %e\n", ps[0])
fmt.Printf(" 75%%: %e\n", ps[1])
fmt.Printf(" 95%%: %e\n", ps[2])
fmt.Printf(" 99%%: %e\n", ps[3])
fmt.Printf(" 99.9%%: %e\n", ps[4])
fmt.Printf(" 90%%: %e\n", ps[2])
fmt.Printf(" 95%%: %e\n", ps[3])
fmt.Printf(" 99%%: %e\n", ps[4])
case metrics.Meter:
m := metric.Snapshot()
fmt.Printf("meter %s\n", name)
@ -92,7 +91,7 @@ func (r *DefaultRegistry) export() {
fmt.Printf(" mean rate: %e\n", m.RateMean())
case metrics.Timer:
t := metric.Snapshot()
ps := t.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
ps := t.Percentiles([]float64{0.5, 0.75, 0.90, 0.95, 0.99})
fmt.Printf("timer %s\n", name)
fmt.Printf(" count: %9d\n", t.Count())
fmt.Printf(" min: %e\n", float64(t.Min()))
@ -101,9 +100,9 @@ func (r *DefaultRegistry) export() {
fmt.Printf(" stddev: %e\n", t.StdDev())
fmt.Printf(" median: %e\n", ps[0])
fmt.Printf(" 75%%: %e\n", ps[1])
fmt.Printf(" 95%%: %e\n", ps[2])
fmt.Printf(" 99%%: %e\n", ps[3])
fmt.Printf(" 99.9%%: %e\n", ps[4])
fmt.Printf(" 90%%: %e\n", ps[2])
fmt.Printf(" 95%%: %e\n", ps[3])
fmt.Printf(" 99%%: %e\n", ps[4])
fmt.Printf(" 1-min rate: %e\n", t.Rate1())
fmt.Printf(" 5-min rate: %e\n", t.Rate5())
fmt.Printf(" 15-min rate: %e\n", t.Rate15())
@ -111,3 +110,31 @@ func (r *DefaultRegistry) export() {
}
})
}
type Counter struct {
metrics.Counter
name string
}
func (c *Counter) Name() string { return c.name }
type Meter struct {
metrics.Meter
name string
}
func (m *Meter) Name() string { return m.name }
type Timer struct {
metrics.Timer
name string
}
func (t *Timer) Name() string { return t.name }
type Histogram struct {
metrics.Histogram
name string
}
func (h *Histogram) Name() string { return h.name }