diff --git a/p2p/trust/config.go b/p2p/trust/config.go index 6fb0e681..b20a8b2c 100644 --- a/p2p/trust/config.go +++ b/p2p/trust/config.go @@ -51,6 +51,5 @@ func customConfig(tmc TrustMetricConfig) TrustMetricConfig { tmc.TrackingWindow >= config.IntervalLength { config.TrackingWindow = tmc.TrackingWindow } - return config } diff --git a/p2p/trust/metric.go b/p2p/trust/metric.go index beb462b2..bf6ddb5e 100644 --- a/p2p/trust/metric.go +++ b/p2p/trust/metric.go @@ -7,6 +7,8 @@ import ( "math" "sync" "time" + + cmn "github.com/tendermint/tmlibs/common" ) //--------------------------------------------------------------------------------------- @@ -31,6 +33,8 @@ type MetricHistoryJSON struct { // TrustMetric - keeps track of peer reliability // See tendermint/docs/architecture/adr-006-trust-metric.md for details type TrustMetric struct { + cmn.BaseService + // Mutex that protects the metric from concurrent access mtx sync.Mutex @@ -73,16 +77,18 @@ type TrustMetric struct { // While true, history data is not modified paused bool - // Signal channel for stopping the trust metric go-routine - stop chan struct{} + // Used during testing in order to control the passing of time intervals + testTicker MetricTicker } -// NewMetric returns a trust metric with the default configuration +// NewMetric returns a trust metric with the default configuration. +// Use Start to begin tracking the quality of peer behavior over time func NewMetric() *TrustMetric { return NewMetricWithConfig(DefaultConfig()) } -// NewMetricWithConfig returns a trust metric with a custom configuration +// NewMetricWithConfig returns a trust metric with a custom configuration. +// Use Start to begin tracking the quality of peer behavior over time func NewMetricWithConfig(tmc TrustMetricConfig) *TrustMetric { tm := new(TrustMetric) config := customConfig(tmc) @@ -97,13 +103,24 @@ func NewMetricWithConfig(tmc TrustMetricConfig) *TrustMetric { tm.historyMaxSize = intervalToHistoryOffset(tm.maxIntervals) + 1 // This metric has a perfect history so far tm.historyValue = 1.0 - // Setup the stop channel - tm.stop = make(chan struct{}) - go tm.processRequests() + tm.BaseService = *cmn.NewBaseService(nil, "TrustMetric", tm) return tm } +// OnStart implements Service +func (tm *TrustMetric) OnStart() error { + if err := tm.BaseService.OnStart(); err != nil { + return err + } + go tm.processRequests() + return nil +} + +// OnStop implements Service +// Nothing to do since the goroutine shuts down by itself via BaseService.Quit +func (tm *TrustMetric) OnStop() {} + // Returns a snapshot of the trust metric history data func (tm *TrustMetric) HistoryJSON() MetricHistoryJSON { tm.mtx.Lock() @@ -155,11 +172,6 @@ func (tm *TrustMetric) Pause() { tm.paused = true } -// Stop tells the metric to stop recording data over time intervals -func (tm *TrustMetric) Stop() { - tm.stop <- struct{}{} -} - // BadEvents indicates that an undesirable event(s) took place func (tm *TrustMetric) BadEvents(num int) { tm.mtx.Lock() @@ -232,6 +244,16 @@ func (tm *TrustMetric) NextTimeInterval() { tm.bad = 0 } +// SetTicker allows a TestTicker to be provided that will manually control +// the passing of time from the perspective of the TrustMetric. +// The ticker must be set before Start is called on the metric +func (tm *TrustMetric) SetTicker(ticker MetricTicker) { + tm.mtx.Lock() + defer tm.mtx.Unlock() + + tm.testTicker = ticker +} + // Copy returns a new trust metric with members containing the same values func (tm *TrustMetric) Copy() *TrustMetric { tm.mtx.Lock() @@ -255,22 +277,28 @@ func (tm *TrustMetric) Copy() *TrustMetric { good: tm.good, bad: tm.bad, paused: tm.paused, - stop: make(chan struct{}), } + } /* Private methods */ // This method is for a goroutine that handles all requests on the metric func (tm *TrustMetric) processRequests() { - t := time.NewTicker(tm.intervalLen) + t := tm.testTicker + if t == nil { + // No test ticker was provided, so we create a normal ticker + t = NewTicker(tm.intervalLen) + } defer t.Stop() + // Obtain the raw channel + tick := t.GetChannel() loop: for { select { - case <-t.C: + case <-tick: tm.NextTimeInterval() - case <-tm.stop: + case <-tm.Quit: // Stop all further tracking for this metric break loop } diff --git a/p2p/trust/metric_test.go b/p2p/trust/metric_test.go index 92272615..00219a19 100644 --- a/p2p/trust/metric_test.go +++ b/p2p/trust/metric_test.go @@ -9,6 +9,7 @@ import ( func TestTrustMetricScores(t *testing.T) { tm := NewMetric() + tm.Start() // Perfect score tm.GoodEvents(1) @@ -31,6 +32,7 @@ func TestTrustMetricConfig(t *testing.T) { } tm := NewMetricWithConfig(config) + tm.Start() // The max time intervals should be the TrackingWindow / IntervalLen assert.Equal(t, int(config.TrackingWindow/config.IntervalLength), tm.maxIntervals) @@ -40,51 +42,54 @@ func TestTrustMetricConfig(t *testing.T) { assert.Equal(t, dc.ProportionalWeight, tm.proportionalWeight) assert.Equal(t, dc.IntegralWeight, tm.integralWeight) tm.Stop() + tm.Wait() config.ProportionalWeight = 0.3 config.IntegralWeight = 0.7 tm = NewMetricWithConfig(config) + tm.Start() // These weights should be equal to our custom values assert.Equal(t, config.ProportionalWeight, tm.proportionalWeight) assert.Equal(t, config.IntegralWeight, tm.integralWeight) tm.Stop() + tm.Wait() } func TestTrustMetricStopPause(t *testing.T) { - // Cause time intervals to pass quickly - config := TrustMetricConfig{ - TrackingWindow: 5 * time.Minute, - IntervalLength: 10 * time.Millisecond, - } - - tm := NewMetricWithConfig(config) - + // The TestTicker will provide manual control over + // the passing of time within the metric + tt := NewTestTicker() + tm := NewMetric() + tm.SetTicker(tt) + tm.Start() // Allow some time intervals to pass and pause - time.Sleep(50 * time.Millisecond) + tt.NextTick() + tt.NextTick() tm.Pause() - // Give the pause some time to take place - time.Sleep(10 * time.Millisecond) first := tm.Copy().numIntervals // Allow more time to pass and check the intervals are unchanged - time.Sleep(50 * time.Millisecond) - assert.Equal(t, first, tm.numIntervals) + tt.NextTick() + tt.NextTick() + assert.Equal(t, first, tm.Copy().numIntervals) // Get the trust metric activated again tm.GoodEvents(5) // Allow some time intervals to pass and stop - time.Sleep(50 * time.Millisecond) + tt.NextTick() + tt.NextTick() tm.Stop() - // Give the stop some time to take place - time.Sleep(10 * time.Millisecond) + tm.Wait() second := tm.Copy().numIntervals - // Allow more time to pass and check the intervals are unchanged - time.Sleep(50 * time.Millisecond) - assert.Equal(t, second, tm.numIntervals) + // Allow more intervals to pass while the metric is stopped + // and check that the number of intervals match + tm.NextTimeInterval() + tm.NextTimeInterval() + assert.Equal(t, second+2, tm.Copy().numIntervals) - if first >= second { + if first > second { t.Fatalf("numIntervals should always increase or stay the same over time") } } diff --git a/p2p/trust/store.go b/p2p/trust/store.go index fd84ac96..0e61b065 100644 --- a/p2p/trust/store.go +++ b/p2p/trust/store.go @@ -34,7 +34,8 @@ type TrustMetricStore struct { } // NewTrustMetricStore returns a store that saves data to the DB -// and uses the config when creating new trust metrics +// and uses the config when creating new trust metrics. +// Use Start to to initialize the trust metric store func NewTrustMetricStore(db dbm.DB, tmc TrustMetricConfig) *TrustMetricStore { tms := &TrustMetricStore{ peerMetrics: make(map[string]*TrustMetric), @@ -84,6 +85,18 @@ func (tms *TrustMetricStore) Size() int { return tms.size() } +// AddPeerTrustMetric takes an existing trust metric and associates it with a peer key. +// The caller is expected to call Start on the TrustMetric being added +func (tms *TrustMetricStore) AddPeerTrustMetric(key string, tm *TrustMetric) { + tms.mtx.Lock() + defer tms.mtx.Unlock() + + if key == "" || tm == nil { + return + } + tms.peerMetrics[key] = tm +} + // GetPeerTrustMetric returns a trust metric by peer key func (tms *TrustMetricStore) GetPeerTrustMetric(key string) *TrustMetric { tms.mtx.Lock() @@ -93,6 +106,7 @@ func (tms *TrustMetricStore) GetPeerTrustMetric(key string) *TrustMetric { if !ok { // If the metric is not available, we will create it tm = NewMetricWithConfig(tms.config) + tm.Start() // The metric needs to be in the map tms.peerMetrics[key] = tm } @@ -149,6 +163,7 @@ func (tms *TrustMetricStore) loadFromDB() bool { for key, p := range peers { tm := NewMetricWithConfig(tms.config) + tm.Start() tm.Init(p) // Load the peer trust metric into the store tms.peerMetrics[key] = tm diff --git a/p2p/trust/store_test.go b/p2p/trust/store_test.go index c0306bba..4e555396 100644 --- a/p2p/trust/store_test.go +++ b/p2p/trust/store_test.go @@ -8,7 +8,6 @@ import ( "io/ioutil" "os" "testing" - "time" "github.com/stretchr/testify/assert" dbm "github.com/tendermint/tmlibs/db" @@ -24,46 +23,50 @@ func TestTrustMetricStoreSaveLoad(t *testing.T) { historyDB := dbm.NewDB("trusthistory", "goleveldb", dir) - config := TrustMetricConfig{ - TrackingWindow: 5 * time.Minute, - IntervalLength: 50 * time.Millisecond, - } - // 0 peers saved - store := NewTrustMetricStore(historyDB, config) + store := NewTrustMetricStore(historyDB, DefaultConfig()) store.SetLogger(log.TestingLogger()) store.saveToDB() // Load the data from the file - store = NewTrustMetricStore(historyDB, config) + store = NewTrustMetricStore(historyDB, DefaultConfig()) store.SetLogger(log.TestingLogger()) - store.loadFromDB() + store.Start() // Make sure we still have 0 entries assert.Zero(t, store.Size()) + // 100 TestTickers + var tt []*TestTicker + for i := 0; i < 100; i++ { + // The TestTicker will provide manual control over + // the passing of time within the metric + tt = append(tt, NewTestTicker()) + } // 100 peers for i := 0; i < 100; i++ { key := fmt.Sprintf("peer_%d", i) - tm := store.GetPeerTrustMetric(key) + tm := NewMetric() + + tm.SetTicker(tt[i]) + tm.Start() + store.AddPeerTrustMetric(key, tm) tm.BadEvents(10) tm.GoodEvents(1) } - // Check that we have 100 entries and save assert.Equal(t, 100, store.Size()) - // Give the metrics time to process the history data - time.Sleep(1 * time.Second) - - // Stop all the trust metrics and save - for _, tm := range store.peerMetrics { - tm.Stop() + // Give the 100 metrics time to process the history data + for i := 0; i < 100; i++ { + tt[i].NextTick() + tt[i].NextTick() } - store.saveToDB() + // Stop all the trust metrics and save + store.Stop() // Load the data from the DB - store = NewTrustMetricStore(historyDB, config) + store = NewTrustMetricStore(historyDB, DefaultConfig()) store.SetLogger(log.TestingLogger()) - store.loadFromDB() + store.Start() // Check that we still have 100 peers with imperfect trust values assert.Equal(t, 100, store.Size()) @@ -71,10 +74,7 @@ func TestTrustMetricStoreSaveLoad(t *testing.T) { assert.NotEqual(t, 1.0, tm.TrustValue()) } - // Stop all the trust metrics - for _, tm := range store.peerMetrics { - tm.Stop() - } + store.Stop() } func TestTrustMetricStoreConfig(t *testing.T) { @@ -88,6 +88,7 @@ func TestTrustMetricStoreConfig(t *testing.T) { // Create a store with custom config store := NewTrustMetricStore(historyDB, config) store.SetLogger(log.TestingLogger()) + store.Start() // Have the store make us a metric with the config tm := store.GetPeerTrustMetric("TestKey") @@ -95,7 +96,7 @@ func TestTrustMetricStoreConfig(t *testing.T) { // Check that the options made it to the metric assert.Equal(t, 0.5, tm.proportionalWeight) assert.Equal(t, 0.5, tm.integralWeight) - tm.Stop() + store.Stop() } func TestTrustMetricStoreLookup(t *testing.T) { @@ -103,6 +104,7 @@ func TestTrustMetricStoreLookup(t *testing.T) { store := NewTrustMetricStore(historyDB, DefaultConfig()) store.SetLogger(log.TestingLogger()) + store.Start() // Create 100 peers in the trust metric store for i := 0; i < 100; i++ { @@ -114,10 +116,7 @@ func TestTrustMetricStoreLookup(t *testing.T) { assert.NotNil(t, ktm, "Expected to find TrustMetric %s but wasn't there.", key) } - // Stop all the trust metrics - for _, tm := range store.peerMetrics { - tm.Stop() - } + store.Stop() } func TestTrustMetricStorePeerScore(t *testing.T) { @@ -125,6 +124,7 @@ func TestTrustMetricStorePeerScore(t *testing.T) { store := NewTrustMetricStore(historyDB, DefaultConfig()) store.SetLogger(log.TestingLogger()) + store.Start() key := "TestKey" tm := store.GetPeerTrustMetric(key) @@ -148,5 +148,5 @@ func TestTrustMetricStorePeerScore(t *testing.T) { // We will remember our experiences with this peer tm = store.GetPeerTrustMetric(key) assert.NotEqual(t, 100, tm.TrustScore()) - tm.Stop() + store.Stop() } diff --git a/p2p/trust/ticker.go b/p2p/trust/ticker.go new file mode 100644 index 00000000..bce9fcc2 --- /dev/null +++ b/p2p/trust/ticker.go @@ -0,0 +1,62 @@ +// Copyright 2017 Tendermint. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package trust + +import ( + "time" +) + +// MetricTicker provides a single ticker interface for the trust metric +type MetricTicker interface { + // GetChannel returns the receive only channel that fires at each time interval + GetChannel() <-chan time.Time + + // Stop will halt further activity on the ticker channel + Stop() +} + +// The ticker used during testing that provides manual control over time intervals +type TestTicker struct { + C chan time.Time + stopped bool +} + +// NewTestTicker returns our ticker used within test routines +func NewTestTicker() *TestTicker { + c := make(chan time.Time, 1) + return &TestTicker{ + C: c, + } +} + +func (t *TestTicker) GetChannel() <-chan time.Time { + return t.C +} + +func (t *TestTicker) Stop() { + t.stopped = true +} + +// NextInterval manually sends Time on the ticker channel +func (t *TestTicker) NextTick() { + if t.stopped { + return + } + t.C <- time.Now() +} + +// Ticker is just a wrap around time.Ticker that allows it +// to meet the requirements of our interface +type Ticker struct { + *time.Ticker +} + +// NewTicker returns a normal time.Ticker wrapped to meet our interface +func NewTicker(d time.Duration) *Ticker { + return &Ticker{time.NewTicker(d)} +} + +func (t *Ticker) GetChannel() <-chan time.Time { + return t.C +}