// Copyright 2015 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // The go-ethereum library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . // Package filters implements an ethereum filtering system for block, // transactions and log events. package filters import ( "errors" "fmt" "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rpc" "golang.org/x/net/context" ) // Type determines the kind of filter and is used to put the filter in to // the correct bucket when added. type Type byte const ( // UnknownSubscription indicates an unkown subscription type UnknownSubscription Type = iota // LogsSubscription queries for new or removed (chain reorg) logs LogsSubscription // PendingLogsSubscription queries for logs in pending blocks PendingLogsSubscription // MinedAndPendingLogsSubscription queries for logs in mined and pending blocks. MinedAndPendingLogsSubscription // PendingTransactionsSubscription queries tx hashes for pending // transactions entering the pending state PendingTransactionsSubscription // BlocksSubscription queries hashes for blocks that are imported BlocksSubscription // LastSubscription keeps track of the last index LastIndexSubscription ) var ( ErrInvalidSubscriptionID = errors.New("invalid id") ) type subscription struct { id rpc.ID typ Type created time.Time logsCrit FilterCriteria logs chan []*vm.Log hashes chan common.Hash headers chan *types.Header installed chan struct{} // closed when the filter is installed err chan error // closed when the filter is uninstalled } // EventSystem creates subscriptions, processes events and broadcasts them to the // subscription which match the subscription criteria. type EventSystem struct { mux *event.TypeMux sub event.Subscription backend Backend lightMode bool lastHead *types.Header install chan *subscription // install filter for event notification uninstall chan *subscription // remove filter for event notification } // NewEventSystem creates a new manager that listens for event on the given mux, // parses and filters them. It uses the all map to retrieve filter changes. The // work loop holds its own index that is used to forward events to filters. // // The returned manager has a loop that needs to be stopped with the Stop function // or by stopping the given mux. func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventSystem { m := &EventSystem{ mux: mux, backend: backend, lightMode: lightMode, install: make(chan *subscription), uninstall: make(chan *subscription), } go m.eventLoop() return m } // Subscription is created when the client registers itself for a particular event. type Subscription struct { ID rpc.ID f *subscription es *EventSystem unsubOnce sync.Once } // Err returns a channel that is closed when unsubscribed. func (sub *Subscription) Err() <-chan error { return sub.f.err } // Unsubscribe uninstalls the subscription from the event broadcast loop. func (sub *Subscription) Unsubscribe() { sub.unsubOnce.Do(func() { uninstallLoop: for { // write uninstall request and consume logs/hashes. This prevents // the eventLoop broadcast method to deadlock when writing to the // filter event channel while the subscription loop is waiting for // this method to return (and thus not reading these events). select { case sub.es.uninstall <- sub.f: break uninstallLoop case <-sub.f.logs: case <-sub.f.hashes: case <-sub.f.headers: } } // wait for filter to be uninstalled in work loop before returning // this ensures that the manager won't use the event channel which // will probably be closed by the client asap after this method returns. <-sub.Err() }) } // subscribe installs the subscription in the event broadcast loop. func (es *EventSystem) subscribe(sub *subscription) *Subscription { es.install <- sub <-sub.installed return &Subscription{ID: sub.id, f: sub, es: es} } // SubscribeLogs creates a subscription that will write all logs matching the // given criteria to the given logs channel. Default value for the from and to // block is "latest". If the fromBlock > toBlock an error is returned. func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []*vm.Log) (*Subscription, error) { var from, to rpc.BlockNumber if crit.FromBlock == nil { from = rpc.LatestBlockNumber } else { from = rpc.BlockNumber(crit.FromBlock.Int64()) } if crit.ToBlock == nil { to = rpc.LatestBlockNumber } else { to = rpc.BlockNumber(crit.ToBlock.Int64()) } // only interested in pending logs if from == rpc.PendingBlockNumber && to == rpc.PendingBlockNumber { return es.subscribePendingLogs(crit, logs), nil } // only interested in new mined logs if from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber { return es.subscribeLogs(crit, logs), nil } // only interested in mined logs within a specific block range if from >= 0 && to >= 0 && to >= from { return es.subscribeLogs(crit, logs), nil } // interested in mined logs from a specific block number, new logs and pending logs if from >= rpc.LatestBlockNumber && to == rpc.PendingBlockNumber { return es.subscribeMinedPendingLogs(crit, logs), nil } // interested in logs from a specific block number to new mined blocks if from >= 0 && to == rpc.LatestBlockNumber { return es.subscribeLogs(crit, logs), nil } return nil, fmt.Errorf("invalid from and to block combination: from > to") } // subscribeMinedPendingLogs creates a subscription that returned mined and // pending logs that match the given criteria. func (es *EventSystem) subscribeMinedPendingLogs(crit FilterCriteria, logs chan []*vm.Log) *Subscription { sub := &subscription{ id: rpc.NewID(), typ: MinedAndPendingLogsSubscription, logsCrit: crit, created: time.Now(), logs: logs, hashes: make(chan common.Hash), headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), } return es.subscribe(sub) } // subscribeLogs creates a subscription that will write all logs matching the // given criteria to the given logs channel. func (es *EventSystem) subscribeLogs(crit FilterCriteria, logs chan []*vm.Log) *Subscription { sub := &subscription{ id: rpc.NewID(), typ: LogsSubscription, logsCrit: crit, created: time.Now(), logs: logs, hashes: make(chan common.Hash), headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), } return es.subscribe(sub) } // subscribePendingLogs creates a subscription that writes transaction hashes for // transactions that enter the transaction pool. func (es *EventSystem) subscribePendingLogs(crit FilterCriteria, logs chan []*vm.Log) *Subscription { sub := &subscription{ id: rpc.NewID(), typ: PendingLogsSubscription, logsCrit: crit, created: time.Now(), logs: logs, hashes: make(chan common.Hash), headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), } return es.subscribe(sub) } // SubscribeNewHeads creates a subscription that writes the header of a block that is // imported in the chain. func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription { sub := &subscription{ id: rpc.NewID(), typ: BlocksSubscription, created: time.Now(), logs: make(chan []*vm.Log), hashes: make(chan common.Hash), headers: headers, installed: make(chan struct{}), err: make(chan error), } return es.subscribe(sub) } // SubscribePendingTxEvents creates a subscription that writes transaction hashes for // transactions that enter the transaction pool. func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscription { sub := &subscription{ id: rpc.NewID(), typ: PendingTransactionsSubscription, created: time.Now(), logs: make(chan []*vm.Log), hashes: hashes, headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), } return es.subscribe(sub) } type filterIndex map[Type]map[rpc.ID]*subscription // broadcast event to filters that match criteria. func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) { if ev == nil { return } switch e := ev.Data.(type) { case vm.Logs: if len(e) > 0 { for _, f := range filters[LogsSubscription] { if ev.Time.After(f.created) { if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { f.logs <- matchedLogs } } } } case core.RemovedLogsEvent: for _, f := range filters[LogsSubscription] { if ev.Time.After(f.created) { if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { f.logs <- matchedLogs } } } case core.PendingLogsEvent: for _, f := range filters[PendingLogsSubscription] { if ev.Time.After(f.created) { if matchedLogs := filterLogs(e.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { f.logs <- matchedLogs } } } case core.TxPreEvent: for _, f := range filters[PendingTransactionsSubscription] { if ev.Time.After(f.created) { f.hashes <- e.Tx.Hash() } } case core.ChainEvent: for _, f := range filters[BlocksSubscription] { if ev.Time.After(f.created) { f.headers <- e.Block.Header() } } if es.lightMode && len(filters[LogsSubscription]) > 0 { es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) { for _, f := range filters[LogsSubscription] { if ev.Time.After(f.created) { if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 { f.logs <- matchedLogs } } } }) } } } func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) { oldh := es.lastHead es.lastHead = newHeader if oldh == nil { return } newh := newHeader // find common ancestor, create list of rolled back and new block hashes var oldHeaders, newHeaders []*types.Header for oldh.Hash() != newh.Hash() { if oldh.Number.Uint64() >= newh.Number.Uint64() { oldHeaders = append(oldHeaders, oldh) oldh = core.GetHeader(es.backend.ChainDb(), oldh.ParentHash, oldh.Number.Uint64()-1) } if oldh.Number.Uint64() < newh.Number.Uint64() { newHeaders = append(newHeaders, newh) newh = core.GetHeader(es.backend.ChainDb(), newh.ParentHash, newh.Number.Uint64()-1) if newh == nil { // happens when CHT syncing, nothing to do newh = oldh } } } // roll back old blocks for _, h := range oldHeaders { callBack(h, true) } // check new blocks (array is in reverse order) for i := len(newHeaders) - 1; i >= 0; i-- { callBack(newHeaders[i], false) } } // filter logs of a single header in light client mode func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []*vm.Log { if bloomFilter(header.Bloom, addresses, topics) { // Get the logs of the block ctx, _ := context.WithTimeout(context.Background(), time.Second*5) receipts, err := es.backend.GetReceipts(ctx, header.Hash()) if err != nil { return nil } var unfiltered []*vm.Log for _, receipt := range receipts { for _, log := range receipt.Logs { logcopy := *log logcopy.Removed = remove unfiltered = append(unfiltered, &logcopy) } } logs := filterLogs(unfiltered, nil, nil, addresses, topics) return logs } return nil } // eventLoop (un)installs filters and processes mux events. func (es *EventSystem) eventLoop() { var ( index = make(filterIndex) sub = es.mux.Subscribe(core.PendingLogsEvent{}, core.RemovedLogsEvent{}, vm.Logs{}, core.TxPreEvent{}, core.ChainEvent{}) ) for i := UnknownSubscription; i < LastIndexSubscription; i++ { index[i] = make(map[rpc.ID]*subscription) } for { select { case ev, active := <-sub.Chan(): if !active { // system stopped return } es.broadcast(index, ev) case f := <-es.install: if f.typ == MinedAndPendingLogsSubscription { // the type are logs and pending logs subscriptions index[LogsSubscription][f.id] = f index[PendingLogsSubscription][f.id] = f } else { index[f.typ][f.id] = f } close(f.installed) case f := <-es.uninstall: if f.typ == MinedAndPendingLogsSubscription { // the type are logs and pending logs subscriptions delete(index[LogsSubscription], f.id) delete(index[PendingLogsSubscription], f.id) } else { delete(index[f.typ], f.id) } close(f.err) } } }