pkg/ethereum: fix watcher lifecycle

This commit is contained in:
Leo 2020-08-17 14:56:22 +02:00
parent c4d53247d3
commit d049aa08e7
1 changed files with 20 additions and 21 deletions

View File

@ -41,7 +41,7 @@ func NewEthBridgeWatcher(url string, bridge eth_common.Address, minConfirmations
} }
func (e *EthBridgeWatcher) Run(ctx context.Context) error { func (e *EthBridgeWatcher) Run(ctx context.Context) error {
c, err := ethclient.Dial(e.url) c, err := ethclient.DialContext(ctx, e.url)
if err != nil { if err != nil {
return fmt.Errorf("dialing eth client failed: %w", err) return fmt.Errorf("dialing eth client failed: %w", err)
} }
@ -60,19 +60,16 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
} }
defer eventSubscription.Unsubscribe() defer eventSubscription.Unsubscribe()
// We only add 1 to the wg so we stop when one of the routines stops/fails errC := make(chan error)
wg := sync.WaitGroup{} logger := supervisor.Logger(ctx)
wg.Add(1)
go func() { go func() {
defer wg.Done()
for { for {
select { select {
case <-ctx.Done():
return
case e := <-eventSubscription.Err(): case e := <-eventSubscription.Err():
if err != nil { errC <- e
err = e
}
return return
case ev := <-sink: case ev := <-sink:
lock := &common.ChainLock{ lock := &common.ChainLock{
@ -85,7 +82,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
Amount: ev.Amount, Amount: ev.Amount,
} }
supervisor.Logger(ctx).Info("found new lockup transaction", zap.Stringer("tx", ev.Raw.TxHash), logger.Info("found new lockup transaction", zap.Stringer("tx", ev.Raw.TxHash),
zap.Uint64("number", ev.Raw.BlockNumber)) zap.Uint64("number", ev.Raw.BlockNumber))
e.pendingLocksGuard.Lock() e.pendingLocksGuard.Lock()
e.pendingLocks[ev.Raw.TxHash] = &pendingLock{ e.pendingLocks[ev.Raw.TxHash] = &pendingLock{
@ -107,18 +104,16 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
defer headerSubscription.Unsubscribe() defer headerSubscription.Unsubscribe()
go func() { go func() {
defer wg.Done()
for { for {
select { select {
case <-ctx.Done():
return
case e := <-headerSubscription.Err(): case e := <-headerSubscription.Err():
if err != nil { errC <- e
err = e
}
return return
case ev := <-headSink: case ev := <-headSink:
start := time.Now() start := time.Now()
supervisor.Logger(ctx).Info("processing new header", zap.Stringer("number", ev.Number)) logger.Info("processing new header", zap.Stringer("number", ev.Number))
e.pendingLocksGuard.Lock() e.pendingLocksGuard.Lock()
blockNumberU := ev.Number.Uint64() blockNumberU := ev.Number.Uint64()
@ -126,7 +121,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
// Transaction was dropped and never picked up again // Transaction was dropped and never picked up again
if pLock.height+4*e.minConfirmations <= blockNumberU { if pLock.height+4*e.minConfirmations <= blockNumberU {
supervisor.Logger(ctx).Debug("lockup timed out", zap.Stringer("tx", pLock.txHash), logger.Debug("lockup timed out", zap.Stringer("tx", pLock.txHash),
zap.Stringer("number", ev.Number)) zap.Stringer("number", ev.Number))
delete(e.pendingLocks, hash) delete(e.pendingLocks, hash)
continue continue
@ -134,7 +129,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
// Transaction is now ready // Transaction is now ready
if pLock.height+e.minConfirmations <= ev.Number.Uint64() { if pLock.height+e.minConfirmations <= ev.Number.Uint64() {
supervisor.Logger(ctx).Debug("lockup confirmed", zap.Stringer("tx", pLock.txHash), logger.Debug("lockup confirmed", zap.Stringer("tx", pLock.txHash),
zap.Stringer("number", ev.Number)) zap.Stringer("number", ev.Number))
delete(e.pendingLocks, hash) delete(e.pendingLocks, hash)
e.evChan <- pLock.lock e.evChan <- pLock.lock
@ -142,14 +137,18 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
} }
e.pendingLocksGuard.Unlock() e.pendingLocksGuard.Unlock()
supervisor.Logger(ctx).Info("processed new header", zap.Stringer("number", ev.Number), logger.Info("processed new header", zap.Stringer("number", ev.Number),
zap.Duration("took", time.Since(start))) zap.Duration("took", time.Since(start)))
} }
} }
}() }()
supervisor.Signal(ctx, supervisor.SignalHealthy) supervisor.Signal(ctx, supervisor.SignalHealthy)
wg.Wait()
return err select {
case <-ctx.Done():
return ctx.Err()
case err := <-errC:
return err
}
} }