From 7c9508ed7160786b6bd87d59d898213bfbdeced5 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 8 Oct 2014 16:20:28 +0200 Subject: [PATCH 1/4] eventer: fix tests --- eventer/eventer_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/eventer/eventer_test.go b/eventer/eventer_test.go index b35267af6..6891622e3 100644 --- a/eventer/eventer_test.go +++ b/eventer/eventer_test.go @@ -3,7 +3,7 @@ package eventer import "testing" func TestChannel(t *testing.T) { - eventer := New(nil) + eventer := New() c := make(Channel, 1) eventer.RegisterChannel("test", c) @@ -17,7 +17,7 @@ func TestChannel(t *testing.T) { } func TestFunction(t *testing.T) { - eventer := New(nil) + eventer := New() var data string eventer.RegisterFunc("test", func(ev Event) { @@ -31,7 +31,7 @@ func TestFunction(t *testing.T) { } func TestRegister(t *testing.T) { - eventer := New(nil) + eventer := New() c := eventer.Register("test") eventer.Post("test", "hello world") @@ -44,7 +44,7 @@ func TestRegister(t *testing.T) { } func TestOn(t *testing.T) { - eventer := New(nil) + eventer := New() c := make(Channel, 1) eventer.On("test", c) From d4512699775497abd5392aa4c617350491021630 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 8 Oct 2014 16:20:44 +0200 Subject: [PATCH 2/4] eventer: add test for concurrent Post/Register This test reports the race condition when run using "go test -race". --- eventer/eventer_test.go | 49 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/eventer/eventer_test.go b/eventer/eventer_test.go index 6891622e3..a5db6d901 100644 --- a/eventer/eventer_test.go +++ b/eventer/eventer_test.go @@ -1,6 +1,10 @@ package eventer -import "testing" +import ( + "math/rand" + "testing" + "time" +) func TestChannel(t *testing.T) { eventer := New() @@ -64,3 +68,46 @@ func TestOn(t *testing.T) { t.Error("Expected function event with data 'hello world'. Got", data) } } + +func TestConcurrentUsage(t *testing.T) { + rand.Seed(time.Now().Unix()) + eventer := New() + stop := make(chan struct{}) + recv := make(chan int) + poster := func() { + for { + select { + case <-stop: + return + default: + eventer.Post("test", "hi") + } + } + } + listener := func(i int) { + time.Sleep(time.Duration(rand.Intn(99)) * time.Millisecond) + c := eventer.Register("test") + // wait for the first event + <-c + recv <- i + // keep receiving to prevent deadlock + for { + select { + case <-stop: + return + case <-c: + } + } + } + + nlisteners := 200 + go poster() + for i := 0; i < nlisteners; i++ { + go listener(i) + } + // wait until everyone has been served + for i := 0; i < nlisteners; i++ { + <-recv + } + close(stop) +} From 44674cb96c64ddf9a8b3345f14329c030ecd4ed6 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 8 Oct 2014 16:26:14 +0200 Subject: [PATCH 3/4] eventer: fix data race --- eventer/eventer.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/eventer/eventer.go b/eventer/eventer.go index fb2f299a3..6e5ee2ec5 100644 --- a/eventer/eventer.go +++ b/eventer/eventer.go @@ -1,5 +1,7 @@ package eventer +import "sync" + // Basic receiver interface. type Receiver interface { Send(Event) @@ -27,17 +29,18 @@ type Event struct { type Channels map[string][]Receiver type EventMachine struct { + mu sync.RWMutex channels Channels } func New() *EventMachine { - return &EventMachine{ - channels: make(Channels), - } + return &EventMachine{channels: make(Channels)} } func (self *EventMachine) add(typ string, r Receiver) { + self.mu.Lock() self.channels[typ] = append(self.channels[typ], r) + self.mu.Unlock() } // Generalised methods for the known receiver types @@ -64,11 +67,11 @@ func (self *EventMachine) RegisterFunc(typ string, f Function) { func (self *EventMachine) Register(typ string) Channel { c := make(Channel, 1) self.add(typ, c) - return c } func (self *EventMachine) Post(typ string, data interface{}) { + self.mu.RLock() if self.channels[typ] != nil { ev := Event{typ, data} for _, receiver := range self.channels[typ] { @@ -76,4 +79,5 @@ func (self *EventMachine) Post(typ string, data interface{}) { receiver.Send(ev) } } + self.mu.RUnlock() } From e83a99903994eaadec3b58822dd18682649ac9dc Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 8 Oct 2014 19:04:58 +0200 Subject: [PATCH 4/4] eth: fix filter map data race This commit also documents (but doesn't enforce) that filters are immutable while they're installed. This required a minor API change. --- ethereum.go | 33 +++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/ethereum.go b/ethereum.go index b83ceb12f..204f30bec 100644 --- a/ethereum.go +++ b/ethereum.go @@ -95,7 +95,9 @@ type Ethereum struct { isUpToDate bool - filters map[int]*ethchain.Filter + filterMu sync.RWMutex + filterId int + filters map[int]*ethchain.Filter } func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager *ethcrypto.KeyManager, caps Caps, usePnp bool) (*Ethereum, error) { @@ -594,22 +596,29 @@ out: } } -var filterId = 0 - -func (self *Ethereum) InstallFilter(object map[string]interface{}) (*ethchain.Filter, int) { - defer func() { filterId++ }() - - filter := ethchain.NewFilterFromMap(object, self) - self.filters[filterId] = filter - - return filter, filterId +// InstallFilter adds filter for blockchain events. +// The filter's callbacks will run for matching blocks and messages. +// The filter should not be modified after it has been installed. +func (self *Ethereum) InstallFilter(filter *ethchain.Filter) (id int) { + self.filterMu.Lock() + id = self.filterId + self.filters[id] = filter + self.filterId++ + self.filterMu.Unlock() + return id } func (self *Ethereum) UninstallFilter(id int) { + self.filterMu.Lock() delete(self.filters, id) + self.filterMu.Unlock() } +// GetFilter retrieves a filter installed using InstallFilter. +// The filter may not be modified. func (self *Ethereum) GetFilter(id int) *ethchain.Filter { + self.filterMu.RLock() + defer self.filterMu.RUnlock() return self.filters[id] } @@ -627,14 +636,17 @@ out: break out case block := <-blockChan: if block, ok := block.Resource.(*ethchain.Block); ok { + self.filterMu.RLock() for _, filter := range self.filters { if filter.BlockCallback != nil { filter.BlockCallback(block) } } + self.filterMu.RUnlock() } case msg := <-messageChan: if messages, ok := msg.Resource.(ethstate.Messages); ok { + self.filterMu.RLock() for _, filter := range self.filters { if filter.MessageCallback != nil { msgs := filter.FilterMessages(messages) @@ -643,6 +655,7 @@ out: } } } + self.filterMu.RUnlock() } } }