diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 6d16f572d..bb11734a7 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" ) @@ -92,8 +93,21 @@ type EventSystem struct { backend Backend lightMode bool lastHead *types.Header - install chan *subscription // install filter for event notification - uninstall chan *subscription // remove filter for event notification + + // Subscriptions + txSub event.Subscription // Subscription for new transaction event + logsSub event.Subscription // Subscription for new log event + rmLogsSub event.Subscription // Subscription for removed log event + chainSub event.Subscription // Subscription for new chain event + pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event + + // Channels + install chan *subscription // install filter for event notification + uninstall chan *subscription // remove filter for event notification + txCh chan core.TxPreEvent // Channel to receive new transaction event + logsCh chan []*types.Log // Channel to receive new log event + rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event + chainCh chan core.ChainEvent // Channel to receive new chain event } // NewEventSystem creates a new manager that listens for event on the given mux, @@ -109,10 +123,27 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS lightMode: lightMode, install: make(chan *subscription), uninstall: make(chan *subscription), + txCh: make(chan core.TxPreEvent, txChanSize), + logsCh: make(chan []*types.Log, logsChanSize), + rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), + chainCh: make(chan core.ChainEvent, chainEvChanSize), + } + + // Subscribe events + m.txSub = m.backend.SubscribeTxPreEvent(m.txCh) + m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh) + m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) + m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) + // TODO(rjl493456442): use feed to subscribe pending log event + m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{}) + + // Make sure none of the subscriptions are empty + if m.txSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || + m.pendingLogSub.Closed() { + log.Crit("Subscribe for event system failed") } go m.eventLoop() - return m } @@ -412,52 +443,37 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common. // eventLoop (un)installs filters and processes mux events. func (es *EventSystem) eventLoop() { - var ( - index = make(filterIndex) - sub = es.mux.Subscribe(core.PendingLogsEvent{}) - // Subscribe TxPreEvent form txpool - txCh = make(chan core.TxPreEvent, txChanSize) - txSub = es.backend.SubscribeTxPreEvent(txCh) - // Subscribe RemovedLogsEvent - rmLogsCh = make(chan core.RemovedLogsEvent, rmLogsChanSize) - rmLogsSub = es.backend.SubscribeRemovedLogsEvent(rmLogsCh) - // Subscribe []*types.Log - logsCh = make(chan []*types.Log, logsChanSize) - logsSub = es.backend.SubscribeLogsEvent(logsCh) - // Subscribe ChainEvent - chainEvCh = make(chan core.ChainEvent, chainEvChanSize) - chainEvSub = es.backend.SubscribeChainEvent(chainEvCh) - ) - - // Unsubscribe all events - defer sub.Unsubscribe() - defer txSub.Unsubscribe() - defer rmLogsSub.Unsubscribe() - defer logsSub.Unsubscribe() - defer chainEvSub.Unsubscribe() + // Ensure all subscriptions get cleaned up + defer func() { + es.pendingLogSub.Unsubscribe() + es.txSub.Unsubscribe() + es.logsSub.Unsubscribe() + es.rmLogsSub.Unsubscribe() + es.chainSub.Unsubscribe() + }() + index := make(filterIndex) for i := UnknownSubscription; i < LastIndexSubscription; i++ { index[i] = make(map[rpc.ID]*subscription) } for { select { - case ev, active := <-sub.Chan(): + // Handle subscribed events + case ev := <-es.txCh: + es.broadcast(index, ev) + case ev := <-es.logsCh: + es.broadcast(index, ev) + case ev := <-es.rmLogsCh: + es.broadcast(index, ev) + case ev := <-es.chainCh: + es.broadcast(index, ev) + case ev, active := <-es.pendingLogSub.Chan(): if !active { // system stopped return } es.broadcast(index, ev) - // Handle subscribed events - case ev := <-txCh: - es.broadcast(index, ev) - case ev := <-rmLogsCh: - es.broadcast(index, ev) - case ev := <-logsCh: - es.broadcast(index, ev) - case ev := <-chainEvCh: - es.broadcast(index, ev) - case f := <-es.install: if f.typ == MinedAndPendingLogsSubscription { // the type are logs and pending logs subscriptions @@ -467,6 +483,7 @@ func (es *EventSystem) eventLoop() { index[f.typ][f.id] = f } close(f.installed) + case f := <-es.uninstall: if f.typ == MinedAndPendingLogsSubscription { // the type are logs and pending logs subscriptions @@ -478,13 +495,13 @@ func (es *EventSystem) eventLoop() { close(f.err) // System stopped - case <-txSub.Err(): + case <-es.txSub.Err(): return - case <-rmLogsSub.Err(): + case <-es.logsSub.Err(): return - case <-logsSub.Err(): + case <-es.rmLogsSub.Err(): return - case <-chainEvSub.Err(): + case <-es.chainSub.Err(): return } } diff --git a/event/event.go b/event/event.go index 20d20d1f5..423278731 100644 --- a/event/event.go +++ b/event/event.go @@ -180,6 +180,12 @@ func (s *TypeMuxSubscription) Unsubscribe() { s.closewait() } +func (s *TypeMuxSubscription) Closed() bool { + s.closeMu.Lock() + defer s.closeMu.Unlock() + return s.closed +} + func (s *TypeMuxSubscription) closewait() { s.closeMu.Lock() defer s.closeMu.Unlock()