quorum/ethreact/reactor.go

184 lines
4.1 KiB
Go

package ethreact
import (
"sync"
"github.com/ethereum/eth-go/ethlog"
)
var logger = ethlog.NewLogger("REACTOR")
const (
eventBufferSize int = 10
)
type EventHandler struct {
lock sync.RWMutex
name string
chans []chan Event
}
// Post the Event with the reactor resource on the channels
// currently subscribed to the event
func (e *EventHandler) Post(event Event) {
e.lock.RLock()
defer e.lock.RUnlock()
// if we want to preserve order pushing to subscibed channels
// dispatching should be syncrounous
// this means if subscribed event channel is blocked
// the reactor dispatch will be blocked, so we need to mitigate by skipping
// rogue blocking subscribers
for i, ch := range e.chans {
select {
case ch <- event:
default:
logger.Debugf("subscribing channel %d to event %s blocked. skipping\n", i, event.Name)
}
}
}
// Add a subscriber to this event
func (e *EventHandler) Add(ch chan Event) {
e.lock.Lock()
defer e.lock.Unlock()
e.chans = append(e.chans, ch)
}
// Remove a subscriber
func (e *EventHandler) Remove(ch chan Event) int {
e.lock.Lock()
defer e.lock.Unlock()
for i, c := range e.chans {
if c == ch {
e.chans = append(e.chans[:i], e.chans[i+1:]...)
}
}
return len(e.chans)
}
// Basic reactor event
type Event struct {
Resource interface{}
Name string
}
// The reactor basic engine. Acts as bridge
// between the events and the subscribers/posters
type ReactorEngine struct {
lock sync.RWMutex
eventChannel chan Event
eventHandlers map[string]*EventHandler
quit chan chan error
running bool
drained chan bool
}
func New() *ReactorEngine {
return &ReactorEngine{
eventHandlers: make(map[string]*EventHandler),
eventChannel: make(chan Event, eventBufferSize),
quit: make(chan chan error, 1),
drained: make(chan bool, 1),
}
}
func (reactor *ReactorEngine) Start() {
reactor.lock.Lock()
defer reactor.lock.Unlock()
if !reactor.running {
go func() {
for {
select {
case status := <-reactor.quit:
reactor.lock.Lock()
defer reactor.lock.Unlock()
reactor.running = false
logger.Infoln("stopped")
status <- nil
return
case event := <-reactor.eventChannel:
// needs to be called syncronously to keep order of events
reactor.dispatch(event)
default:
reactor.drained <- true // blocking till message is coming in
}
}
}()
reactor.running = true
logger.Infoln("started")
}
}
func (reactor *ReactorEngine) Stop() {
if reactor.running {
status := make(chan error)
reactor.quit <- status
select {
case <-reactor.drained:
default:
}
<-status
}
}
func (reactor *ReactorEngine) Flush() {
<-reactor.drained
}
// Subscribe a channel to the specified event
func (reactor *ReactorEngine) Subscribe(event string, eventChannel chan Event) {
reactor.lock.Lock()
defer reactor.lock.Unlock()
eventHandler := reactor.eventHandlers[event]
// Create a new event handler if one isn't available
if eventHandler == nil {
eventHandler = &EventHandler{name: event}
reactor.eventHandlers[event] = eventHandler
}
// Add the events channel to reactor event handler
eventHandler.Add(eventChannel)
logger.Debugf("added new subscription to %s", event)
}
func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event) {
reactor.lock.Lock()
defer reactor.lock.Unlock()
eventHandler := reactor.eventHandlers[event]
if eventHandler != nil {
len := eventHandler.Remove(eventChannel)
if len == 0 {
reactor.eventHandlers[event] = nil
}
logger.Debugf("removed subscription to %s", event)
}
}
func (reactor *ReactorEngine) Post(event string, resource interface{}) {
reactor.lock.Lock()
defer reactor.lock.Unlock()
if reactor.running {
reactor.eventChannel <- Event{Resource: resource, Name: event}
select {
case <-reactor.drained:
default:
}
}
}
func (reactor *ReactorEngine) dispatch(event Event) {
name := event.Name
eventHandler := reactor.eventHandlers[name]
// if no subscriptions to this event type - no event handler created
// then noone to notify
if eventHandler != nil {
// needs to be called syncronously
eventHandler.Post(event)
}
}