From 7af7d8cfef65404858ca9be236516a25a7641c4e Mon Sep 17 00:00:00 2001 From: Trung Nguyen Date: Wed, 5 Sep 2018 17:22:42 -0400 Subject: [PATCH] avoid growing backlogs when valSet changes --- consensus/istanbul/core/backlog.go | 14 ++++++++++---- consensus/istanbul/core/backlog_test.go | 22 ++++++++++++---------- consensus/istanbul/core/core.go | 4 ++-- 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/consensus/istanbul/core/backlog.go b/consensus/istanbul/core/backlog.go index 73a9732b8..ac62c37e7 100644 --- a/consensus/istanbul/core/backlog.go +++ b/consensus/istanbul/core/backlog.go @@ -88,7 +88,8 @@ func (c *core) storeBacklog(msg *message, src istanbul.Validator) { c.backlogsMu.Lock() defer c.backlogsMu.Unlock() - backlog := c.backlogs[src] + logger.Debug("Retrieving backlog queue", "for", src.Address(), "backlogs_size", len(c.backlogs)) + backlog := c.backlogs[src.Address()] if backlog == nil { backlog = prque.New() } @@ -107,18 +108,23 @@ func (c *core) storeBacklog(msg *message, src istanbul.Validator) { backlog.Push(msg, toPriority(msg.Code, p.View)) } } - c.backlogs[src] = backlog + c.backlogs[src.Address()] = backlog } func (c *core) processBacklog() { c.backlogsMu.Lock() defer c.backlogsMu.Unlock() - for src, backlog := range c.backlogs { + for srcAddress, backlog := range c.backlogs { if backlog == nil { continue } - + _, src := c.valSet.GetByAddress(srcAddress) + if src == nil { + // validator is not available + delete(c.backlogs, srcAddress) + continue + } logger := c.logger.New("from", src, "state", c.state) isFuture := false diff --git a/consensus/istanbul/core/backlog_test.go b/consensus/istanbul/core/backlog_test.go index bdef2c698..6e02493bc 100644 --- a/consensus/istanbul/core/backlog_test.go +++ b/consensus/istanbul/core/backlog_test.go @@ -25,7 +25,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/istanbul" - "github.com/ethereum/go-ethereum/consensus/istanbul/validator" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "gopkg.in/karalabe/cookiejar.v2/collections/prque" @@ -168,14 +167,15 @@ func TestCheckMessage(t *testing.T) { func TestStoreBacklog(t *testing.T) { c := &core{ logger: log.New("backend", "test", "id", 0), - backlogs: make(map[istanbul.Validator]*prque.Prque), + valSet: newTestValidatorSet(1), + backlogs: make(map[common.Address]*prque.Prque), backlogsMu: new(sync.Mutex), } v := &istanbul.View{ Round: big.NewInt(10), Sequence: big.NewInt(10), } - p := validator.New(common.StringToAddress("12345667890")) + p := c.valSet.GetByIndex(0) // push preprepare msg preprepare := &istanbul.Preprepare{ View: v, @@ -187,7 +187,7 @@ func TestStoreBacklog(t *testing.T) { Msg: prepreparePayload, } c.storeBacklog(m, p) - msg := c.backlogs[p].PopItem() + msg := c.backlogs[p.Address()].PopItem() if !reflect.DeepEqual(msg, m) { t.Errorf("message mismatch: have %v, want %v", msg, m) } @@ -204,7 +204,7 @@ func TestStoreBacklog(t *testing.T) { Msg: subjectPayload, } c.storeBacklog(m, p) - msg = c.backlogs[p].PopItem() + msg = c.backlogs[p.Address()].PopItem() if !reflect.DeepEqual(msg, m) { t.Errorf("message mismatch: have %v, want %v", msg, m) } @@ -215,7 +215,7 @@ func TestStoreBacklog(t *testing.T) { Msg: subjectPayload, } c.storeBacklog(m, p) - msg = c.backlogs[p].PopItem() + msg = c.backlogs[p.Address()].PopItem() if !reflect.DeepEqual(msg, m) { t.Errorf("message mismatch: have %v, want %v", msg, m) } @@ -226,7 +226,7 @@ func TestStoreBacklog(t *testing.T) { Msg: subjectPayload, } c.storeBacklog(m, p) - msg = c.backlogs[p].PopItem() + msg = c.backlogs[p.Address()].PopItem() if !reflect.DeepEqual(msg, m) { t.Errorf("message mismatch: have %v, want %v", msg, m) } @@ -238,7 +238,8 @@ func TestProcessFutureBacklog(t *testing.T) { } c := &core{ logger: log.New("backend", "test", "id", 0), - backlogs: make(map[istanbul.Validator]*prque.Prque), + valSet: newTestValidatorSet(1), + backlogs: make(map[common.Address]*prque.Prque), backlogsMu: new(sync.Mutex), backend: backend, current: newRoundState(&istanbul.View{ @@ -254,7 +255,7 @@ func TestProcessFutureBacklog(t *testing.T) { Round: big.NewInt(10), Sequence: big.NewInt(10), } - p := validator.New(common.StringToAddress("12345667890")) + p := c.valSet.GetByIndex(0) // push a future msg subject := &istanbul.Subject{ View: v, @@ -329,8 +330,9 @@ func testProcessBacklog(t *testing.T, msg *message) { } c := &core{ logger: log.New("backend", "test", "id", 0), - backlogs: make(map[istanbul.Validator]*prque.Prque), + backlogs: make(map[common.Address]*prque.Prque), backlogsMu: new(sync.Mutex), + valSet: vset, backend: backend, state: State(msg.Code), current: newRoundState(&istanbul.View{ diff --git a/consensus/istanbul/core/core.go b/consensus/istanbul/core/core.go index 5818da976..0cede30ee 100644 --- a/consensus/istanbul/core/core.go +++ b/consensus/istanbul/core/core.go @@ -42,7 +42,7 @@ func New(backend istanbul.Backend, config *istanbul.Config) Engine { handlerWg: new(sync.WaitGroup), logger: log.New("address", backend.Address()), backend: backend, - backlogs: make(map[istanbul.Validator]*prque.Prque), + backlogs: make(map[common.Address]*prque.Prque), backlogsMu: new(sync.Mutex), pendingRequests: prque.New(), pendingRequestsMu: new(sync.Mutex), @@ -73,7 +73,7 @@ type core struct { waitingForRoundChange bool validateFn func([]byte, []byte) (common.Address, error) - backlogs map[istanbul.Validator]*prque.Prque + backlogs map[common.Address]*prque.Prque backlogsMu *sync.Mutex current *roundState