avoid growing backlogs when valSet changes

This commit is contained in:
Trung Nguyen 2018-09-05 17:22:42 -04:00
parent 06342c9490
commit 7af7d8cfef
No known key found for this signature in database
GPG Key ID: 4636434ED9505EB7
3 changed files with 24 additions and 16 deletions

View File

@ -88,7 +88,8 @@ func (c *core) storeBacklog(msg *message, src istanbul.Validator) {
c.backlogsMu.Lock() c.backlogsMu.Lock()
defer c.backlogsMu.Unlock() defer c.backlogsMu.Unlock()
backlog := c.backlogs[src] logger.Debug("Retrieving backlog queue", "for", src.Address(), "backlogs_size", len(c.backlogs))
backlog := c.backlogs[src.Address()]
if backlog == nil { if backlog == nil {
backlog = prque.New() backlog = prque.New()
} }
@ -107,18 +108,23 @@ func (c *core) storeBacklog(msg *message, src istanbul.Validator) {
backlog.Push(msg, toPriority(msg.Code, p.View)) backlog.Push(msg, toPriority(msg.Code, p.View))
} }
} }
c.backlogs[src] = backlog c.backlogs[src.Address()] = backlog
} }
func (c *core) processBacklog() { func (c *core) processBacklog() {
c.backlogsMu.Lock() c.backlogsMu.Lock()
defer c.backlogsMu.Unlock() defer c.backlogsMu.Unlock()
for src, backlog := range c.backlogs { for srcAddress, backlog := range c.backlogs {
if backlog == nil { if backlog == nil {
continue continue
} }
_, src := c.valSet.GetByAddress(srcAddress)
if src == nil {
// validator is not available
delete(c.backlogs, srcAddress)
continue
}
logger := c.logger.New("from", src, "state", c.state) logger := c.logger.New("from", src, "state", c.state)
isFuture := false isFuture := false

View File

@ -25,7 +25,6 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul" "github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/consensus/istanbul/validator"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"gopkg.in/karalabe/cookiejar.v2/collections/prque" "gopkg.in/karalabe/cookiejar.v2/collections/prque"
@ -168,14 +167,15 @@ func TestCheckMessage(t *testing.T) {
func TestStoreBacklog(t *testing.T) { func TestStoreBacklog(t *testing.T) {
c := &core{ c := &core{
logger: log.New("backend", "test", "id", 0), logger: log.New("backend", "test", "id", 0),
backlogs: make(map[istanbul.Validator]*prque.Prque), valSet: newTestValidatorSet(1),
backlogs: make(map[common.Address]*prque.Prque),
backlogsMu: new(sync.Mutex), backlogsMu: new(sync.Mutex),
} }
v := &istanbul.View{ v := &istanbul.View{
Round: big.NewInt(10), Round: big.NewInt(10),
Sequence: big.NewInt(10), Sequence: big.NewInt(10),
} }
p := validator.New(common.StringToAddress("12345667890")) p := c.valSet.GetByIndex(0)
// push preprepare msg // push preprepare msg
preprepare := &istanbul.Preprepare{ preprepare := &istanbul.Preprepare{
View: v, View: v,
@ -187,7 +187,7 @@ func TestStoreBacklog(t *testing.T) {
Msg: prepreparePayload, Msg: prepreparePayload,
} }
c.storeBacklog(m, p) c.storeBacklog(m, p)
msg := c.backlogs[p].PopItem() msg := c.backlogs[p.Address()].PopItem()
if !reflect.DeepEqual(msg, m) { if !reflect.DeepEqual(msg, m) {
t.Errorf("message mismatch: have %v, want %v", msg, m) t.Errorf("message mismatch: have %v, want %v", msg, m)
} }
@ -204,7 +204,7 @@ func TestStoreBacklog(t *testing.T) {
Msg: subjectPayload, Msg: subjectPayload,
} }
c.storeBacklog(m, p) c.storeBacklog(m, p)
msg = c.backlogs[p].PopItem() msg = c.backlogs[p.Address()].PopItem()
if !reflect.DeepEqual(msg, m) { if !reflect.DeepEqual(msg, m) {
t.Errorf("message mismatch: have %v, want %v", msg, m) t.Errorf("message mismatch: have %v, want %v", msg, m)
} }
@ -215,7 +215,7 @@ func TestStoreBacklog(t *testing.T) {
Msg: subjectPayload, Msg: subjectPayload,
} }
c.storeBacklog(m, p) c.storeBacklog(m, p)
msg = c.backlogs[p].PopItem() msg = c.backlogs[p.Address()].PopItem()
if !reflect.DeepEqual(msg, m) { if !reflect.DeepEqual(msg, m) {
t.Errorf("message mismatch: have %v, want %v", msg, m) t.Errorf("message mismatch: have %v, want %v", msg, m)
} }
@ -226,7 +226,7 @@ func TestStoreBacklog(t *testing.T) {
Msg: subjectPayload, Msg: subjectPayload,
} }
c.storeBacklog(m, p) c.storeBacklog(m, p)
msg = c.backlogs[p].PopItem() msg = c.backlogs[p.Address()].PopItem()
if !reflect.DeepEqual(msg, m) { if !reflect.DeepEqual(msg, m) {
t.Errorf("message mismatch: have %v, want %v", msg, m) t.Errorf("message mismatch: have %v, want %v", msg, m)
} }
@ -238,7 +238,8 @@ func TestProcessFutureBacklog(t *testing.T) {
} }
c := &core{ c := &core{
logger: log.New("backend", "test", "id", 0), logger: log.New("backend", "test", "id", 0),
backlogs: make(map[istanbul.Validator]*prque.Prque), valSet: newTestValidatorSet(1),
backlogs: make(map[common.Address]*prque.Prque),
backlogsMu: new(sync.Mutex), backlogsMu: new(sync.Mutex),
backend: backend, backend: backend,
current: newRoundState(&istanbul.View{ current: newRoundState(&istanbul.View{
@ -254,7 +255,7 @@ func TestProcessFutureBacklog(t *testing.T) {
Round: big.NewInt(10), Round: big.NewInt(10),
Sequence: big.NewInt(10), Sequence: big.NewInt(10),
} }
p := validator.New(common.StringToAddress("12345667890")) p := c.valSet.GetByIndex(0)
// push a future msg // push a future msg
subject := &istanbul.Subject{ subject := &istanbul.Subject{
View: v, View: v,
@ -329,8 +330,9 @@ func testProcessBacklog(t *testing.T, msg *message) {
} }
c := &core{ c := &core{
logger: log.New("backend", "test", "id", 0), logger: log.New("backend", "test", "id", 0),
backlogs: make(map[istanbul.Validator]*prque.Prque), backlogs: make(map[common.Address]*prque.Prque),
backlogsMu: new(sync.Mutex), backlogsMu: new(sync.Mutex),
valSet: vset,
backend: backend, backend: backend,
state: State(msg.Code), state: State(msg.Code),
current: newRoundState(&istanbul.View{ current: newRoundState(&istanbul.View{

View File

@ -42,7 +42,7 @@ func New(backend istanbul.Backend, config *istanbul.Config) Engine {
handlerWg: new(sync.WaitGroup), handlerWg: new(sync.WaitGroup),
logger: log.New("address", backend.Address()), logger: log.New("address", backend.Address()),
backend: backend, backend: backend,
backlogs: make(map[istanbul.Validator]*prque.Prque), backlogs: make(map[common.Address]*prque.Prque),
backlogsMu: new(sync.Mutex), backlogsMu: new(sync.Mutex),
pendingRequests: prque.New(), pendingRequests: prque.New(),
pendingRequestsMu: new(sync.Mutex), pendingRequestsMu: new(sync.Mutex),
@ -73,7 +73,7 @@ type core struct {
waitingForRoundChange bool waitingForRoundChange bool
validateFn func([]byte, []byte) (common.Address, error) validateFn func([]byte, []byte) (common.Address, error)
backlogs map[istanbul.Validator]*prque.Prque backlogs map[common.Address]*prque.Prque
backlogsMu *sync.Mutex backlogsMu *sync.Mutex
current *roundState current *roundState