From 0c6f1c9c3aaba8e5a15ed50e90e65dd344afef8a Mon Sep 17 00:00:00 2001 From: zelig Date: Fri, 4 Jul 2014 19:38:53 +0100 Subject: [PATCH] ethreact - consistent renaming - React -> Event - ReactorEvent -> EventHandler - NewReactorEngine -> New - async ReactorEngine main loop with select on main eventChannel and quit channel - ReactorEngine main loop control with Start() Stop() Flush() - ReactorEngine.dispatch - use sync.RWMutex - delete eventHandler if subscribed channels go to 0 --- ethreact/README.md | 40 +++++++++ ethreact/reactor.go | 175 +++++++++++++++++++++++++++++++++++++++ ethreact/reactor_test.go | 63 ++++++++++++++ 3 files changed, 278 insertions(+) create mode 100644 ethreact/README.md create mode 100644 ethreact/reactor.go create mode 100644 ethreact/reactor_test.go diff --git a/ethreact/README.md b/ethreact/README.md new file mode 100644 index 000000000..592b50b96 --- /dev/null +++ b/ethreact/README.md @@ -0,0 +1,40 @@ +# ethreact + +ethereum event reactor. Component of the ethereum stack. +various events like state change on an account or new block found are broadcast to subscribers. +Broadcasting to subscribers is running on its own routine and globally order preserving. + +## Clients +### subscribe + + eventChannel := make(chan ethreact.Event) + reactor.Subscribe(event, eventChannel) + +The same channel can be subscribed to multiple events but only once for each event. In order to allow order of events to be preserved, broadcast of events is synchronous within the main broadcast loop. Therefore any blocking subscriber channels will be skipped, i.e. missing broadcasting events while they are blocked. + +### unsubscribe + + reactor.Unsubscribe(event, eventChannel) + +### Processing events + +event.Resource is of type interface{}. The actual type of event.Resource depends on event.Name and may need to be cast for processing. + + var event ethreact.Event + for { + select { + case event = <-eventChannel: + processTransaction(event.Resource.(Transaction)) + } + } + +## Broadcast + + reactor := ethreact.New() + reactor.Start() + reactor.Post(name, resource) + reactor.Flush() // wait till all broadcast messages are dispatched + reactor.Stop() // stop the main broadcast loop immediately (even if there are unbroadcast events left) + + + diff --git a/ethreact/reactor.go b/ethreact/reactor.go new file mode 100644 index 000000000..3802d95b3 --- /dev/null +++ b/ethreact/reactor.go @@ -0,0 +1,175 @@ +package ethreact + +import ( + "github.com/ethereum/eth-go/ethlog" + "sync" +) + +var logger = ethlog.NewLogger("REACTOR") + +type EventHandler struct { + lock sync.RWMutex + name string + chans []chan Event +} + +// Post the Event with the reactor resource on the channels +// currently subscribed to the event +func (e *EventHandler) Post(event Event) { + e.lock.RLock() + defer e.lock.RUnlock() + + // if we want to preserve order pushing to subscibed channels + // dispatching should be syncrounous + // this means if subscribed event channel is blocked (closed or has fixed capacity) + // the reactor dispatch will be blocked, so we need to mitigate by skipping + // rogue blocking subscribers + for i, ch := range e.chans { + select { + case ch <- event: + default: + logger.Warnln("subscribing channel %d to event %s blocked. skipping", i, event.Name) + } + } +} + +// Add a subscriber to this event +func (e *EventHandler) Add(ch chan Event) { + e.lock.Lock() + defer e.lock.Unlock() + + e.chans = append(e.chans, ch) +} + +// Remove a subscriber +func (e *EventHandler) Remove(ch chan Event) int { + e.lock.Lock() + defer e.lock.Unlock() + + for i, c := range e.chans { + if c == ch { + e.chans = append(e.chans[:i], e.chans[i+1:]...) + } + } + return len(e.chans) +} + +// Basic reactor resource +type Event struct { + Resource interface{} + Name string +} + +// The reactor basic engine. Acts as bridge +// between the events and the subscribers/posters +type ReactorEngine struct { + lock sync.RWMutex + eventChannel chan Event + eventHandlers map[string]*EventHandler + quit chan bool + shutdownChannel chan bool + running bool + drained bool +} + +func New() *ReactorEngine { + return &ReactorEngine{ + eventHandlers: make(map[string]*EventHandler), + eventChannel: make(chan Event), + quit: make(chan bool, 1), + shutdownChannel: make(chan bool, 1), + } +} + +func (reactor *ReactorEngine) Start() { + reactor.lock.Lock() + defer reactor.lock.Unlock() + if !reactor.running { + go func() { + out: + for { + select { + case <-reactor.quit: + break out + case event := <-reactor.eventChannel: + // needs to be called syncronously to keep order of events + reactor.dispatch(event) + default: + reactor.drained = true + } + } + reactor.lock.Lock() + defer reactor.lock.Unlock() + reactor.running = false + logger.Infoln("stopped") + close(reactor.shutdownChannel) + }() + reactor.running = true + logger.Infoln("started") + } +} + +func (reactor *ReactorEngine) Stop() { + reactor.lock.RLock() + if reactor.running { + reactor.quit <- true + } + reactor.lock.RUnlock() + <-reactor.shutdownChannel +} + +func (reactor *ReactorEngine) Flush() { + for !reactor.drained { + } +} + +// Subscribe a channel to the specified event +func (reactor *ReactorEngine) Subscribe(event string, eventChannel chan Event) { + reactor.lock.Lock() + defer reactor.lock.Unlock() + + eventHandler := reactor.eventHandlers[event] + // Create a new event handler if one isn't available + if eventHandler == nil { + eventHandler = &EventHandler{name: event} + reactor.eventHandlers[event] = eventHandler + } + // Add the events channel to reactor event handler + eventHandler.Add(eventChannel) + logger.Debugln("added new subscription to %s", event) +} + +func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event) { + reactor.lock.Lock() + defer reactor.lock.Unlock() + + eventHandler := reactor.eventHandlers[event] + if eventHandler != nil { + len := eventHandler.Remove(eventChannel) + if len == 0 { + reactor.eventHandlers[event] = nil + } + logger.Debugln("removed subscription to %s", event) + } +} + +func (reactor *ReactorEngine) Post(event string, resource interface{}) { + reactor.lock.Lock() + defer reactor.lock.Unlock() + + if reactor.running { + reactor.drained = false + reactor.eventChannel <- Event{Resource: resource, Name: event} + } +} + +func (reactor *ReactorEngine) dispatch(event Event) { + name := event.Name + eventHandler := reactor.eventHandlers[name] + // if no subscriptions to this event type - no event handler created + // then noone to notify + if eventHandler != nil { + // needs to be called syncronously + eventHandler.Post(event) + } +} diff --git a/ethreact/reactor_test.go b/ethreact/reactor_test.go new file mode 100644 index 000000000..801a8abd0 --- /dev/null +++ b/ethreact/reactor_test.go @@ -0,0 +1,63 @@ +package ethreact + +import ( + "fmt" + "testing" +) + +func TestReactorAdd(t *testing.T) { + reactor := New() + ch := make(chan Event) + reactor.Subscribe("test", ch) + if reactor.eventHandlers["test"] == nil { + t.Error("Expected new eventHandler to be created") + } + reactor.Unsubscribe("test", ch) + if reactor.eventHandlers["test"] != nil { + t.Error("Expected eventHandler to be removed") + } +} + +func TestReactorEvent(t *testing.T) { + var name string + reactor := New() + // Buffer the channel, so it doesn't block for this test + cap := 20 + ch := make(chan Event, cap) + reactor.Subscribe("even", ch) + reactor.Subscribe("odd", ch) + reactor.Post("even", "disappears") // should not broadcast if engine not started + reactor.Start() + for i := 0; i < cap; i++ { + if i%2 == 0 { + name = "even" + } else { + name = "odd" + } + reactor.Post(name, i) + } + reactor.Post("test", cap) // this should not block + i := 0 + reactor.Flush() + close(ch) + for event := range ch { + fmt.Printf("%d: %v", i, event) + if i%2 == 0 { + name = "even" + } else { + name = "odd" + } + if val, ok := event.Resource.(int); ok { + if i != val || event.Name != name { + t.Error("Expected event %d to be of type %s and resource %d, got ", i, name, i, val) + } + } else { + t.Error("Unable to cast") + } + i++ + } + if i != cap { + t.Error("excpected exactly %d events, got ", i) + } + reactor.Stop() +}