Merge pull request #33 from ethersphere/feature/ethutil-refactor

ethreact - Feature/ethutil refactor
This commit is contained in:
Jeffrey Wilcke 2014-07-07 10:52:02 +02:00
commit b958179263
11 changed files with 353 additions and 159 deletions

View File

@ -3,6 +3,7 @@ package ethchain
import (
"github.com/ethereum/eth-go/ethcrypto"
"github.com/ethereum/eth-go/ethlog"
"github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethutil"
"github.com/obscuren/sha3"
"hash"
@ -14,7 +15,7 @@ import (
var powlogger = ethlog.NewLogger("POW")
type PoW interface {
Search(block *Block, reactChan chan ethutil.React) []byte
Search(block *Block, reactChan chan ethreact.Event) []byte
Verify(hash []byte, diff *big.Int, nonce []byte) bool
}
@ -22,7 +23,7 @@ type EasyPow struct {
hash *big.Int
}
func (pow *EasyPow) Search(block *Block, reactChan chan ethutil.React) []byte {
func (pow *EasyPow) Search(block *Block, reactChan chan ethreact.Event) []byte {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
hash := block.HashNoNonce()
diff := block.Difficulty

View File

@ -6,6 +6,7 @@ import (
"fmt"
"github.com/ethereum/eth-go/ethcrypto"
"github.com/ethereum/eth-go/ethlog"
"github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethtrie"
"github.com/ethereum/eth-go/ethutil"
"github.com/ethereum/eth-go/ethwire"
@ -36,7 +37,7 @@ type EthManager interface {
BlockChain() *BlockChain
TxPool() *TxPool
Broadcast(msgType ethwire.MsgType, data []interface{})
Reactor() *ethutil.ReactorEngine
Reactor() *ethreact.ReactorEngine
PeerCount() int
IsMining() bool
IsListening() bool

View File

@ -6,6 +6,7 @@ import (
"github.com/ethereum/eth-go/ethchain"
"github.com/ethereum/eth-go/ethcrypto"
"github.com/ethereum/eth-go/ethlog"
"github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethrpc"
"github.com/ethereum/eth-go/ethutil"
"github.com/ethereum/eth-go/ethwire"
@ -71,7 +72,7 @@ type Ethereum struct {
listening bool
reactor *ethutil.ReactorEngine
reactor *ethreact.ReactorEngine
RpcServer *ethrpc.JsonRpcServer
@ -106,7 +107,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager
keyManager: keyManager,
clientIdentity: clientIdentity,
}
ethereum.reactor = ethutil.NewReactorEngine()
ethereum.reactor = ethreact.New()
ethereum.txPool = ethchain.NewTxPool(ethereum)
ethereum.blockChain = ethchain.NewBlockChain(ethereum)
@ -118,7 +119,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager
return ethereum, nil
}
func (s *Ethereum) Reactor() *ethutil.ReactorEngine {
func (s *Ethereum) Reactor() *ethreact.ReactorEngine {
return s.reactor
}
@ -350,6 +351,7 @@ func (s *Ethereum) ReapDeadPeerHandler() {
// Start the ethereum
func (s *Ethereum) Start(seed bool) {
s.reactor.Start()
// Bind to addr and port
ln, err := net.Listen("tcp", ":"+s.Port)
if err != nil {
@ -461,6 +463,9 @@ func (s *Ethereum) Stop() {
s.txPool.Stop()
s.stateManager.Stop()
s.reactor.Flush()
s.reactor.Stop()
ethlogger.Infoln("Server stopped")
close(s.shutdownChan)
}

View File

@ -40,6 +40,9 @@ func (msg *logMessage) send(logger LogSystem) {
var logMessages chan (*logMessage)
var logSystems []LogSystem
var quit chan bool
var drained chan bool
var shutdown chan bool
var mutex = sync.Mutex{}
type LogLevel uint8
@ -57,29 +60,41 @@ func start() {
out:
for {
select {
case <-quit:
break out
case msg := <-logMessages:
for _, logSystem := range logSystems {
if logSystem.GetLogLevel() >= msg.LogLevel {
msg.send(logSystem)
}
}
case <-quit:
break out
case drained <- true:
default:
drained <- true // this blocks until a message is sent to the queu
}
}
close(shutdown)
}
func Reset() {
mutex.Lock()
defer mutex.Unlock()
if logSystems != nil {
quit <- true
select {
case <-drained:
}
<-shutdown
}
logSystems = nil
}
// waits until log messages are drained (dispatched to log writers)
func Flush() {
quit <- true
done:
for {
select {
case <-logMessages:
default:
break done
}
mutex.Lock()
defer mutex.Unlock()
if logSystems != nil {
<-drained
}
}
@ -92,28 +107,34 @@ func NewLogger(tag string) *Logger {
}
func AddLogSystem(logSystem LogSystem) {
var mutex = &sync.Mutex{}
mutex.Lock()
defer mutex.Unlock()
if logSystems == nil {
logMessages = make(chan *logMessage)
quit = make(chan bool)
drained = make(chan bool, 1)
shutdown = make(chan bool, 1)
go start()
}
logSystems = append(logSystems, logSystem)
}
func send(msg *logMessage) {
select {
case <-drained:
}
logMessages <- msg
}
func (logger *Logger) sendln(level LogLevel, v ...interface{}) {
if logMessages != nil {
msg := newPrintlnLogMessage(level, logger.tag, v...)
logMessages <- msg
if logSystems != nil {
send(newPrintlnLogMessage(level, logger.tag, v...))
}
}
func (logger *Logger) sendf(level LogLevel, format string, v ...interface{}) {
if logMessages != nil {
msg := newPrintfLogMessage(level, logger.tag, format, v...)
logMessages <- msg
if logSystems != nil {
send(newPrintfLogMessage(level, logger.tag, format, v...))
}
}

View File

@ -28,10 +28,6 @@ func (t *TestLogSystem) GetLogLevel() LogLevel {
return t.level
}
func quote(s string) string {
return fmt.Sprintf("'%s'", s)
}
func TestLoggerPrintln(t *testing.T) {
logger := NewLogger("TEST")
testLogSystem := &TestLogSystem{level: WarnLevel}
@ -41,10 +37,10 @@ func TestLoggerPrintln(t *testing.T) {
logger.Infoln("info")
logger.Debugln("debug")
Flush()
Reset()
output := testLogSystem.Output
fmt.Println(quote(output))
if output != "[TEST] error\n[TEST] warn\n" {
t.Error("Expected logger output '[TEST] error\\n[TEST] warn\\n', got ", quote(testLogSystem.Output))
t.Error("Expected logger output '[TEST] error\\n[TEST] warn\\n', got ", testLogSystem.Output)
}
}
@ -57,10 +53,10 @@ func TestLoggerPrintf(t *testing.T) {
logger.Infof("info")
logger.Debugf("debug")
Flush()
Reset()
output := testLogSystem.Output
fmt.Println(quote(output))
if output != "[TEST] error to { 2}\n[TEST] warn" {
t.Error("Expected logger output '[TEST] error to { 2}\\n[TEST] warn', got ", quote(testLogSystem.Output))
t.Error("Expected logger output '[TEST] error to { 2}\\n[TEST] warn', got ", testLogSystem.Output)
}
}
@ -73,13 +69,14 @@ func TestMultipleLogSystems(t *testing.T) {
logger.Errorln("error")
logger.Warnln("warn")
Flush()
Reset()
output0 := testLogSystem0.Output
output1 := testLogSystem1.Output
if output0 != "[TEST] error\n" {
t.Error("Expected logger 0 output '[TEST] error\\n', got ", quote(testLogSystem0.Output))
t.Error("Expected logger 0 output '[TEST] error\\n', got ", testLogSystem0.Output)
}
if output1 != "[TEST] error\n[TEST] warn\n" {
t.Error("Expected logger 1 output '[TEST] error\\n[TEST] warn\\n', got ", quote(testLogSystem1.Output))
t.Error("Expected logger 1 output '[TEST] error\\n[TEST] warn\\n', got ", testLogSystem1.Output)
}
}
@ -92,11 +89,11 @@ func TestFileLogSystem(t *testing.T) {
logger.Errorf("error to %s\n", filename)
logger.Warnln("warn")
Flush()
Reset()
contents, _ := ioutil.ReadFile(filename)
output := string(contents)
fmt.Println(quote(output))
if output != "[TEST] error to test.log\n[TEST] warn\n" {
t.Error("Expected contents of file 'test.log': '[TEST] error to test.log\\n[TEST] warn\\n', got ", quote(output))
t.Error("Expected contents of file 'test.log': '[TEST] error to test.log\\n[TEST] warn\\n', got ", output)
} else {
os.Remove(filename)
}
@ -105,5 +102,7 @@ func TestFileLogSystem(t *testing.T) {
func TestNoLogSystem(t *testing.T) {
logger := NewLogger("TEST")
logger.Warnln("warn")
fmt.Println("1")
Flush()
Reset()
}

View File

@ -4,7 +4,7 @@ import (
"bytes"
"github.com/ethereum/eth-go/ethchain"
"github.com/ethereum/eth-go/ethlog"
"github.com/ethereum/eth-go/ethutil"
"github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethwire"
"sort"
)
@ -15,19 +15,19 @@ type Miner struct {
pow ethchain.PoW
ethereum ethchain.EthManager
coinbase []byte
reactChan chan ethutil.React
reactChan chan ethreact.Event
txs ethchain.Transactions
uncles []*ethchain.Block
block *ethchain.Block
powChan chan []byte
powQuitChan chan ethutil.React
powQuitChan chan ethreact.Event
quitChan chan bool
}
func NewDefaultMiner(coinbase []byte, ethereum ethchain.EthManager) Miner {
reactChan := make(chan ethutil.React, 1) // This is the channel that receives 'updates' when ever a new transaction or block comes in
powChan := make(chan []byte, 1) // This is the channel that receives valid sha hases for a given block
powQuitChan := make(chan ethutil.React, 1) // This is the channel that can exit the miner thread
reactChan := make(chan ethreact.Event, 1) // This is the channel that receives 'updates' when ever a new transaction or block comes in
powChan := make(chan []byte, 1) // This is the channel that receives valid sha hases for a given block
powQuitChan := make(chan ethreact.Event, 1) // This is the channel that can exit the miner thread
quitChan := make(chan bool, 1)
ethereum.Reactor().Subscribe("newBlock", reactChan)

40
ethreact/README.md Normal file
View File

@ -0,0 +1,40 @@
# ethreact
ethereum event reactor. Component of the ethereum stack.
various events like state change on an account or new block found are broadcast to subscribers.
Broadcasting to subscribers is running on its own routine and globally order preserving.
## Clients
### subscribe
eventChannel := make(chan ethreact.Event)
reactor.Subscribe(event, eventChannel)
The same channel can be subscribed to multiple events but only once for each event. In order to allow order of events to be preserved, broadcast of events is synchronous within the main broadcast loop. Therefore any blocking subscriber channels will be skipped, i.e. missing broadcasting events while they are blocked.
### unsubscribe
reactor.Unsubscribe(event, eventChannel)
### Processing events
event.Resource is of type interface{}. The actual type of event.Resource depends on event.Name and may need to be cast for processing.
var event ethreact.Event
for {
select {
case event = <-eventChannel:
processTransaction(event.Resource.(Transaction))
}
}
## Broadcast
reactor := ethreact.New()
reactor.Start()
reactor.Post(name, resource)
reactor.Flush() // wait till all broadcast messages are dispatched
reactor.Stop() // stop the main broadcast loop immediately (even if there are unbroadcast events left)

181
ethreact/reactor.go Normal file
View File

@ -0,0 +1,181 @@
package ethreact
import (
"github.com/ethereum/eth-go/ethlog"
"sync"
)
var logger = ethlog.NewLogger("REACTOR")
type EventHandler struct {
lock sync.RWMutex
name string
chans []chan Event
}
// Post the Event with the reactor resource on the channels
// currently subscribed to the event
func (e *EventHandler) Post(event Event) {
e.lock.RLock()
defer e.lock.RUnlock()
// if we want to preserve order pushing to subscibed channels
// dispatching should be syncrounous
// this means if subscribed event channel is blocked (closed or has fixed capacity)
// the reactor dispatch will be blocked, so we need to mitigate by skipping
// rogue blocking subscribers
for i, ch := range e.chans {
select {
case ch <- event:
default:
logger.Warnf("subscribing channel %d to event %s blocked. skipping\n", i, event.Name)
}
}
}
// Add a subscriber to this event
func (e *EventHandler) Add(ch chan Event) {
e.lock.Lock()
defer e.lock.Unlock()
e.chans = append(e.chans, ch)
}
// Remove a subscriber
func (e *EventHandler) Remove(ch chan Event) int {
e.lock.Lock()
defer e.lock.Unlock()
for i, c := range e.chans {
if c == ch {
e.chans = append(e.chans[:i], e.chans[i+1:]...)
}
}
return len(e.chans)
}
// Basic reactor resource
type Event struct {
Resource interface{}
Name string
}
// The reactor basic engine. Acts as bridge
// between the events and the subscribers/posters
type ReactorEngine struct {
lock sync.RWMutex
eventChannel chan Event
eventHandlers map[string]*EventHandler
quit chan bool
shutdownChannel chan bool
running bool
drained chan bool
}
func New() *ReactorEngine {
return &ReactorEngine{
eventHandlers: make(map[string]*EventHandler),
eventChannel: make(chan Event),
quit: make(chan bool, 1),
drained: make(chan bool, 1),
shutdownChannel: make(chan bool, 1),
}
}
func (reactor *ReactorEngine) Start() {
reactor.lock.Lock()
defer reactor.lock.Unlock()
if !reactor.running {
go func() {
out:
for {
select {
case <-reactor.quit:
break out
case event := <-reactor.eventChannel:
// needs to be called syncronously to keep order of events
reactor.dispatch(event)
case reactor.drained <- true:
default:
reactor.drained <- true // blocking till message is coming in
}
}
reactor.lock.Lock()
defer reactor.lock.Unlock()
reactor.running = false
logger.Infoln("stopped")
close(reactor.shutdownChannel)
}()
reactor.running = true
logger.Infoln("started")
}
}
func (reactor *ReactorEngine) Stop() {
reactor.lock.RLock()
if reactor.running {
reactor.quit <- true
select {
case <-reactor.drained:
}
}
reactor.lock.RUnlock()
<-reactor.shutdownChannel
}
func (reactor *ReactorEngine) Flush() {
<-reactor.drained
}
// Subscribe a channel to the specified event
func (reactor *ReactorEngine) Subscribe(event string, eventChannel chan Event) {
reactor.lock.Lock()
defer reactor.lock.Unlock()
eventHandler := reactor.eventHandlers[event]
// Create a new event handler if one isn't available
if eventHandler == nil {
eventHandler = &EventHandler{name: event}
reactor.eventHandlers[event] = eventHandler
}
// Add the events channel to reactor event handler
eventHandler.Add(eventChannel)
logger.Debugf("added new subscription to %s", event)
}
func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event) {
reactor.lock.Lock()
defer reactor.lock.Unlock()
eventHandler := reactor.eventHandlers[event]
if eventHandler != nil {
len := eventHandler.Remove(eventChannel)
if len == 0 {
reactor.eventHandlers[event] = nil
}
logger.Debugf("removed subscription to %s", event)
}
}
func (reactor *ReactorEngine) Post(event string, resource interface{}) {
reactor.lock.Lock()
defer reactor.lock.Unlock()
if reactor.running {
reactor.eventChannel <- Event{Resource: resource, Name: event}
select {
case <-reactor.drained:
}
}
}
func (reactor *ReactorEngine) dispatch(event Event) {
name := event.Name
eventHandler := reactor.eventHandlers[name]
// if no subscriptions to this event type - no event handler created
// then noone to notify
if eventHandler != nil {
// needs to be called syncronously
eventHandler.Post(event)
}
}

63
ethreact/reactor_test.go Normal file
View File

@ -0,0 +1,63 @@
package ethreact
import (
"fmt"
"testing"
)
func TestReactorAdd(t *testing.T) {
reactor := New()
ch := make(chan Event)
reactor.Subscribe("test", ch)
if reactor.eventHandlers["test"] == nil {
t.Error("Expected new eventHandler to be created")
}
reactor.Unsubscribe("test", ch)
if reactor.eventHandlers["test"] != nil {
t.Error("Expected eventHandler to be removed")
}
}
func TestReactorEvent(t *testing.T) {
var name string
reactor := New()
// Buffer the channel, so it doesn't block for this test
cap := 20
ch := make(chan Event, cap)
reactor.Subscribe("even", ch)
reactor.Subscribe("odd", ch)
reactor.Post("even", "disappears") // should not broadcast if engine not started
reactor.Start()
for i := 0; i < cap; i++ {
if i%2 == 0 {
name = "even"
} else {
name = "odd"
}
reactor.Post(name, i)
}
reactor.Post("test", cap) // this should not block
i := 0
reactor.Flush()
close(ch)
for event := range ch {
fmt.Printf("%d: %v", i, event)
if i%2 == 0 {
name = "even"
} else {
name = "odd"
}
if val, ok := event.Resource.(int); ok {
if i != val || event.Name != name {
t.Error("Expected event %d to be of type %s and resource %d, got ", i, name, i, val)
}
} else {
t.Error("Unable to cast")
}
i++
}
if i != cap {
t.Error("excpected exactly %d events, got ", i)
}
reactor.Stop()
}

View File

@ -1,87 +0,0 @@
package ethutil
import (
"sync"
)
type ReactorEvent struct {
mut sync.Mutex
event string
chans []chan React
}
// Post the specified reactor resource on the channels
// currently subscribed
func (e *ReactorEvent) Post(react React) {
e.mut.Lock()
defer e.mut.Unlock()
for _, ch := range e.chans {
go func(ch chan React) {
ch <- react
}(ch)
}
}
// Add a subscriber to this event
func (e *ReactorEvent) Add(ch chan React) {
e.mut.Lock()
defer e.mut.Unlock()
e.chans = append(e.chans, ch)
}
// Remove a subscriber
func (e *ReactorEvent) Remove(ch chan React) {
e.mut.Lock()
defer e.mut.Unlock()
for i, c := range e.chans {
if c == ch {
e.chans = append(e.chans[:i], e.chans[i+1:]...)
}
}
}
// Basic reactor resource
type React struct {
Resource interface{}
Event string
}
// The reactor basic engine. Acts as bridge
// between the events and the subscribers/posters
type ReactorEngine struct {
patterns map[string]*ReactorEvent
}
func NewReactorEngine() *ReactorEngine {
return &ReactorEngine{patterns: make(map[string]*ReactorEvent)}
}
// Subscribe a channel to the specified event
func (reactor *ReactorEngine) Subscribe(event string, ch chan React) {
ev := reactor.patterns[event]
// Create a new event if one isn't available
if ev == nil {
ev = &ReactorEvent{event: event}
reactor.patterns[event] = ev
}
// Add the channel to reactor event handler
ev.Add(ch)
}
func (reactor *ReactorEngine) Unsubscribe(event string, ch chan React) {
ev := reactor.patterns[event]
if ev != nil {
ev.Remove(ch)
}
}
func (reactor *ReactorEngine) Post(event string, resource interface{}) {
ev := reactor.patterns[event]
if ev != nil {
ev.Post(React{Resource: resource, Event: event})
}
}

View File

@ -1,30 +0,0 @@
package ethutil
import "testing"
func TestReactorAdd(t *testing.T) {
engine := NewReactorEngine()
ch := make(chan React)
engine.Subscribe("test", ch)
if len(engine.patterns) != 1 {
t.Error("Expected patterns to be 1, got", len(engine.patterns))
}
}
func TestReactorEvent(t *testing.T) {
engine := NewReactorEngine()
// Buffer 1, so it doesn't block for this test
ch := make(chan React, 1)
engine.Subscribe("test", ch)
engine.Post("test", "hello")
value := <-ch
if val, ok := value.Resource.(string); ok {
if val != "hello" {
t.Error("Expected Resource to be 'hello', got", val)
}
} else {
t.Error("Unable to cast")
}
}