mirror of https://github.com/poanetwork/quorum.git
Merge pull request #521 from trung/f-istanbul-backlogs
Fixes #481 memory leak in `backlogs` .
This commit is contained in:
commit
a34b72577e
|
@ -88,7 +88,8 @@ func (c *core) storeBacklog(msg *message, src istanbul.Validator) {
|
|||
c.backlogsMu.Lock()
|
||||
defer c.backlogsMu.Unlock()
|
||||
|
||||
backlog := c.backlogs[src]
|
||||
logger.Debug("Retrieving backlog queue", "for", src.Address(), "backlogs_size", len(c.backlogs))
|
||||
backlog := c.backlogs[src.Address()]
|
||||
if backlog == nil {
|
||||
backlog = prque.New()
|
||||
}
|
||||
|
@ -107,18 +108,23 @@ func (c *core) storeBacklog(msg *message, src istanbul.Validator) {
|
|||
backlog.Push(msg, toPriority(msg.Code, p.View))
|
||||
}
|
||||
}
|
||||
c.backlogs[src] = backlog
|
||||
c.backlogs[src.Address()] = backlog
|
||||
}
|
||||
|
||||
func (c *core) processBacklog() {
|
||||
c.backlogsMu.Lock()
|
||||
defer c.backlogsMu.Unlock()
|
||||
|
||||
for src, backlog := range c.backlogs {
|
||||
for srcAddress, backlog := range c.backlogs {
|
||||
if backlog == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
_, src := c.valSet.GetByAddress(srcAddress)
|
||||
if src == nil {
|
||||
// validator is not available
|
||||
delete(c.backlogs, srcAddress)
|
||||
continue
|
||||
}
|
||||
logger := c.logger.New("from", src, "state", c.state)
|
||||
isFuture := false
|
||||
|
||||
|
|
|
@ -25,7 +25,6 @@ import (
|
|||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/consensus/istanbul"
|
||||
"github.com/ethereum/go-ethereum/consensus/istanbul/validator"
|
||||
"github.com/ethereum/go-ethereum/event"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
|
||||
|
@ -168,14 +167,15 @@ func TestCheckMessage(t *testing.T) {
|
|||
func TestStoreBacklog(t *testing.T) {
|
||||
c := &core{
|
||||
logger: log.New("backend", "test", "id", 0),
|
||||
backlogs: make(map[istanbul.Validator]*prque.Prque),
|
||||
valSet: newTestValidatorSet(1),
|
||||
backlogs: make(map[common.Address]*prque.Prque),
|
||||
backlogsMu: new(sync.Mutex),
|
||||
}
|
||||
v := &istanbul.View{
|
||||
Round: big.NewInt(10),
|
||||
Sequence: big.NewInt(10),
|
||||
}
|
||||
p := validator.New(common.StringToAddress("12345667890"))
|
||||
p := c.valSet.GetByIndex(0)
|
||||
// push preprepare msg
|
||||
preprepare := &istanbul.Preprepare{
|
||||
View: v,
|
||||
|
@ -187,7 +187,7 @@ func TestStoreBacklog(t *testing.T) {
|
|||
Msg: prepreparePayload,
|
||||
}
|
||||
c.storeBacklog(m, p)
|
||||
msg := c.backlogs[p].PopItem()
|
||||
msg := c.backlogs[p.Address()].PopItem()
|
||||
if !reflect.DeepEqual(msg, m) {
|
||||
t.Errorf("message mismatch: have %v, want %v", msg, m)
|
||||
}
|
||||
|
@ -204,7 +204,7 @@ func TestStoreBacklog(t *testing.T) {
|
|||
Msg: subjectPayload,
|
||||
}
|
||||
c.storeBacklog(m, p)
|
||||
msg = c.backlogs[p].PopItem()
|
||||
msg = c.backlogs[p.Address()].PopItem()
|
||||
if !reflect.DeepEqual(msg, m) {
|
||||
t.Errorf("message mismatch: have %v, want %v", msg, m)
|
||||
}
|
||||
|
@ -215,7 +215,7 @@ func TestStoreBacklog(t *testing.T) {
|
|||
Msg: subjectPayload,
|
||||
}
|
||||
c.storeBacklog(m, p)
|
||||
msg = c.backlogs[p].PopItem()
|
||||
msg = c.backlogs[p.Address()].PopItem()
|
||||
if !reflect.DeepEqual(msg, m) {
|
||||
t.Errorf("message mismatch: have %v, want %v", msg, m)
|
||||
}
|
||||
|
@ -226,7 +226,7 @@ func TestStoreBacklog(t *testing.T) {
|
|||
Msg: subjectPayload,
|
||||
}
|
||||
c.storeBacklog(m, p)
|
||||
msg = c.backlogs[p].PopItem()
|
||||
msg = c.backlogs[p.Address()].PopItem()
|
||||
if !reflect.DeepEqual(msg, m) {
|
||||
t.Errorf("message mismatch: have %v, want %v", msg, m)
|
||||
}
|
||||
|
@ -238,7 +238,8 @@ func TestProcessFutureBacklog(t *testing.T) {
|
|||
}
|
||||
c := &core{
|
||||
logger: log.New("backend", "test", "id", 0),
|
||||
backlogs: make(map[istanbul.Validator]*prque.Prque),
|
||||
valSet: newTestValidatorSet(1),
|
||||
backlogs: make(map[common.Address]*prque.Prque),
|
||||
backlogsMu: new(sync.Mutex),
|
||||
backend: backend,
|
||||
current: newRoundState(&istanbul.View{
|
||||
|
@ -254,7 +255,7 @@ func TestProcessFutureBacklog(t *testing.T) {
|
|||
Round: big.NewInt(10),
|
||||
Sequence: big.NewInt(10),
|
||||
}
|
||||
p := validator.New(common.StringToAddress("12345667890"))
|
||||
p := c.valSet.GetByIndex(0)
|
||||
// push a future msg
|
||||
subject := &istanbul.Subject{
|
||||
View: v,
|
||||
|
@ -329,8 +330,9 @@ func testProcessBacklog(t *testing.T, msg *message) {
|
|||
}
|
||||
c := &core{
|
||||
logger: log.New("backend", "test", "id", 0),
|
||||
backlogs: make(map[istanbul.Validator]*prque.Prque),
|
||||
backlogs: make(map[common.Address]*prque.Prque),
|
||||
backlogsMu: new(sync.Mutex),
|
||||
valSet: vset,
|
||||
backend: backend,
|
||||
state: State(msg.Code),
|
||||
current: newRoundState(&istanbul.View{
|
||||
|
|
|
@ -42,7 +42,7 @@ func New(backend istanbul.Backend, config *istanbul.Config) Engine {
|
|||
handlerWg: new(sync.WaitGroup),
|
||||
logger: log.New("address", backend.Address()),
|
||||
backend: backend,
|
||||
backlogs: make(map[istanbul.Validator]*prque.Prque),
|
||||
backlogs: make(map[common.Address]*prque.Prque),
|
||||
backlogsMu: new(sync.Mutex),
|
||||
pendingRequests: prque.New(),
|
||||
pendingRequestsMu: new(sync.Mutex),
|
||||
|
@ -73,7 +73,7 @@ type core struct {
|
|||
waitingForRoundChange bool
|
||||
validateFn func([]byte, []byte) (common.Address, error)
|
||||
|
||||
backlogs map[istanbul.Validator]*prque.Prque
|
||||
backlogs map[common.Address]*prque.Prque
|
||||
backlogsMu *sync.Mutex
|
||||
|
||||
current *roundState
|
||||
|
|
Loading…
Reference in New Issue