eth/filters: added notifications for out of bound log events

Out of Bound log events are events that were removed due to a fork. When
logs are received the filtering mechanism should check for the `removed`
field on the json structure.
This commit is contained in:
Jeffrey Wilcke 2016-01-05 14:55:28 +01:00
parent a50bccc642
commit 68dda34905
4 changed files with 132 additions and 26 deletions

View File

@ -206,12 +206,12 @@ func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []commo
filter.SetEndBlock(latest) filter.SetEndBlock(latest)
filter.SetAddresses(addresses) filter.SetAddresses(addresses)
filter.SetTopics(topics) filter.SetTopics(topics)
filter.LogsCallback = func(logs vm.Logs) { filter.LogCallback = func(log *vm.Log, removed bool) {
s.logMu.Lock() s.logMu.Lock()
defer s.logMu.Unlock() defer s.logMu.Unlock()
if queue := s.logQueue[id]; queue != nil { if queue := s.logQueue[id]; queue != nil {
queue.add(logs...) queue.add(vmlog{log, removed})
} }
} }
@ -365,14 +365,14 @@ func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) {
} }
// GetLogs returns the logs matching the given argument. // GetLogs returns the logs matching the given argument.
func (s *PublicFilterAPI) GetLogs(args NewFilterArgs) vm.Logs { func (s *PublicFilterAPI) GetLogs(args NewFilterArgs) []vmlog {
filter := New(s.chainDb) filter := New(s.chainDb)
filter.SetBeginBlock(args.FromBlock.Int64()) filter.SetBeginBlock(args.FromBlock.Int64())
filter.SetEndBlock(args.ToBlock.Int64()) filter.SetEndBlock(args.ToBlock.Int64())
filter.SetAddresses(args.Addresses) filter.SetAddresses(args.Addresses)
filter.SetTopics(args.Topics) filter.SetTopics(args.Topics)
return returnLogs(filter.Find()) return toRPCLogs(filter.Find(), false)
} }
// UninstallFilter removes the filter with the given filter id. // UninstallFilter removes the filter with the given filter id.
@ -447,7 +447,7 @@ func (s *PublicFilterAPI) transactionFilterChanged(id int) []common.Hash {
} }
// logFilterChanged returns a collection of logs for the log filter with the given id. // logFilterChanged returns a collection of logs for the log filter with the given id.
func (s *PublicFilterAPI) logFilterChanged(id int) vm.Logs { func (s *PublicFilterAPI) logFilterChanged(id int) []vmlog {
s.logMu.Lock() s.logMu.Lock()
defer s.logMu.Unlock() defer s.logMu.Unlock()
@ -458,17 +458,17 @@ func (s *PublicFilterAPI) logFilterChanged(id int) vm.Logs {
} }
// GetFilterLogs returns the logs for the filter with the given id. // GetFilterLogs returns the logs for the filter with the given id.
func (s *PublicFilterAPI) GetFilterLogs(filterId string) vm.Logs { func (s *PublicFilterAPI) GetFilterLogs(filterId string) []vmlog {
id, ok := s.filterMapping[filterId] id, ok := s.filterMapping[filterId]
if !ok { if !ok {
return returnLogs(nil) return toRPCLogs(nil, false)
} }
if filter := s.filterManager.Get(id); filter != nil { if filter := s.filterManager.Get(id); filter != nil {
return returnLogs(filter.Find()) return toRPCLogs(filter.Find(), false)
} }
return returnLogs(nil) return toRPCLogs(nil, false)
} }
// GetFilterChanges returns the logs for the filter with the given id since last time is was called. // GetFilterChanges returns the logs for the filter with the given id since last time is was called.
@ -488,28 +488,33 @@ func (s *PublicFilterAPI) GetFilterChanges(filterId string) interface{} {
case transactionFilterTy: case transactionFilterTy:
return returnHashes(s.transactionFilterChanged(id)) return returnHashes(s.transactionFilterChanged(id))
case logFilterTy: case logFilterTy:
return returnLogs(s.logFilterChanged(id)) return s.logFilterChanged(id)
} }
return []interface{}{} return []interface{}{}
} }
type vmlog struct {
*vm.Log
Removed bool `json:"removed"`
}
type logQueue struct { type logQueue struct {
mu sync.Mutex mu sync.Mutex
logs vm.Logs logs []vmlog
timeout time.Time timeout time.Time
id int id int
} }
func (l *logQueue) add(logs ...*vm.Log) { func (l *logQueue) add(logs ...vmlog) {
l.mu.Lock() l.mu.Lock()
defer l.mu.Unlock() defer l.mu.Unlock()
l.logs = append(l.logs, logs...) l.logs = append(l.logs, logs...)
} }
func (l *logQueue) get() vm.Logs { func (l *logQueue) get() []vmlog {
l.mu.Lock() l.mu.Lock()
defer l.mu.Unlock() defer l.mu.Unlock()
@ -556,13 +561,16 @@ func newFilterId() (string, error) {
return "0x" + hex.EncodeToString(subid[:]), nil return "0x" + hex.EncodeToString(subid[:]), nil
} }
// returnLogs is a helper that will return an empty logs array case the given logs is nil, otherwise is will return the // toRPCLogs is a helper that will convert a vm.Logs array to an structure which
// given logs. The RPC interfaces defines that always an array is returned. // can hold additional information about the logs such as whether it was deleted.
func returnLogs(logs vm.Logs) vm.Logs { // Additionally when nil is given it will by default instead create an empty slice
if logs == nil { // instead. This is required by the RPC specification.
return vm.Logs{} func toRPCLogs(logs vm.Logs, removed bool) []vmlog {
convertedLogs := make([]vmlog, len(logs))
for i, log := range logs {
convertedLogs[i] = vmlog{Log: log, Removed: removed}
} }
return logs return convertedLogs
} }
// returnHashes is a helper that will return an empty hash array case the given hash array is nil, otherwise is will // returnHashes is a helper that will return an empty hash array case the given hash array is nil, otherwise is will

View File

@ -39,7 +39,7 @@ type Filter struct {
BlockCallback func(*types.Block, vm.Logs) BlockCallback func(*types.Block, vm.Logs)
TransactionCallback func(*types.Transaction) TransactionCallback func(*types.Transaction)
LogsCallback func(vm.Logs) LogCallback func(*vm.Log, bool)
} }
// Create a new filter which uses a bloom filter on blocks to figure out whether a particular block // Create a new filter which uses a bloom filter on blocks to figure out whether a particular block

View File

@ -46,6 +46,7 @@ func NewFilterSystem(mux *event.TypeMux) *FilterSystem {
} }
fs.sub = mux.Subscribe( fs.sub = mux.Subscribe(
//core.PendingBlockEvent{}, //core.PendingBlockEvent{},
core.RemovedLogEvent{},
core.ChainEvent{}, core.ChainEvent{},
core.TxPreEvent{}, core.TxPreEvent{},
vm.Logs(nil), vm.Logs(nil),
@ -96,7 +97,7 @@ func (fs *FilterSystem) filterLoop() {
case core.ChainEvent: case core.ChainEvent:
fs.filterMu.RLock() fs.filterMu.RLock()
for id, filter := range fs.filters { for id, filter := range fs.filters {
if filter.BlockCallback != nil && fs.created[id].Before(event.Time) { if filter.BlockCallback != nil && !fs.created[id].After(event.Time) {
filter.BlockCallback(ev.Block, ev.Logs) filter.BlockCallback(ev.Block, ev.Logs)
} }
} }
@ -105,7 +106,7 @@ func (fs *FilterSystem) filterLoop() {
case core.TxPreEvent: case core.TxPreEvent:
fs.filterMu.RLock() fs.filterMu.RLock()
for id, filter := range fs.filters { for id, filter := range fs.filters {
if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) { if filter.TransactionCallback != nil && !fs.created[id].After(event.Time) {
filter.TransactionCallback(ev.Tx) filter.TransactionCallback(ev.Tx)
} }
} }
@ -114,10 +115,20 @@ func (fs *FilterSystem) filterLoop() {
case vm.Logs: case vm.Logs:
fs.filterMu.RLock() fs.filterMu.RLock()
for id, filter := range fs.filters { for id, filter := range fs.filters {
if filter.LogsCallback != nil && fs.created[id].Before(event.Time) { if filter.LogCallback != nil && !fs.created[id].After(event.Time) {
msgs := filter.FilterLogs(ev) for _, log := range filter.FilterLogs(ev) {
if len(msgs) > 0 { filter.LogCallback(log, false)
filter.LogsCallback(msgs) }
}
}
fs.filterMu.RUnlock()
case core.RemovedLogEvent:
fs.filterMu.RLock()
for id, filter := range fs.filters {
if filter.LogCallback != nil && !fs.created[id].After(event.Time) {
for _, removedLog := range ev.Logs {
filter.LogCallback(removedLog, true)
} }
} }
} }

View File

@ -0,0 +1,87 @@
package filters
import (
"testing"
"time"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/event"
)
func TestCallbacks(t *testing.T) {
var (
mux event.TypeMux
fs = NewFilterSystem(&mux)
blockDone = make(chan struct{})
txDone = make(chan struct{})
logDone = make(chan struct{})
removedLogDone = make(chan struct{})
)
blockFilter := &Filter{
BlockCallback: func(*types.Block, vm.Logs) {
close(blockDone)
},
}
txFilter := &Filter{
TransactionCallback: func(*types.Transaction) {
close(txDone)
},
}
logFilter := &Filter{
LogCallback: func(l *vm.Log, oob bool) {
if !oob {
close(logDone)
}
},
}
removedLogFilter := &Filter{
LogCallback: func(l *vm.Log, oob bool) {
if oob {
close(removedLogDone)
}
},
}
fs.Add(blockFilter)
fs.Add(txFilter)
fs.Add(logFilter)
fs.Add(removedLogFilter)
mux.Post(core.ChainEvent{})
mux.Post(core.TxPreEvent{})
mux.Post(core.RemovedLogEvent{vm.Logs{&vm.Log{}}})
mux.Post(vm.Logs{&vm.Log{}})
const dura = 5 * time.Second
failTimer := time.NewTimer(dura)
select {
case <-blockDone:
case <-failTimer.C:
t.Error("block filter failed to trigger (timeout)")
}
failTimer.Reset(dura)
select {
case <-txDone:
case <-failTimer.C:
t.Error("transaction filter failed to trigger (timeout)")
}
failTimer.Reset(dura)
select {
case <-logDone:
case <-failTimer.C:
t.Error("log filter failed to trigger (timeout)")
}
failTimer.Reset(dura)
select {
case <-removedLogDone:
case <-failTimer.C:
t.Error("removed log filter failed to trigger (timeout)")
}
}