Always cancel contexts to avoid leaking goroutines

This commit is contained in:
Leo 2020-08-21 23:47:58 +02:00
parent 6b113853bd
commit 30d921ec25
3 changed files with 17 additions and 8 deletions

View File

@ -66,7 +66,8 @@ func vaaConsensusProcessor(lockC chan *common.ChainLock, setC chan *common.Guard
len(gs.Keys), *devNumGuardians), len(gs.Keys), *devNumGuardians),
zap.Any("v", v)) zap.Any("v", v))
timeout, _ := context.WithTimeout(ctx, 15*time.Second) timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
tx, err := devnet.SubmitVAA(timeout, *ethRPC, v) tx, err := devnet.SubmitVAA(timeout, *ethRPC, v)
if err != nil { if err != nil {
logger.Error("failed to submit devnet guardian set change", zap.Error(err)) logger.Error("failed to submit devnet guardian set change", zap.Error(err))
@ -277,10 +278,12 @@ func vaaConsensusProcessor(lockC chan *common.ChainLock, setC chan *common.Guard
vaaC <- state.vaaSignatures[hash].ourVAA vaaC <- state.vaaSignatures[hash].ourVAA
} }
case t.TargetChain == vaa.ChainIDEthereum: case t.TargetChain == vaa.ChainIDEthereum:
timeout, _ := context.WithTimeout(ctx, 15*time.Second) timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
tx, err := devnet.SubmitVAA(timeout, *ethRPC, v) tx, err := devnet.SubmitVAA(timeout, *ethRPC, v)
cancel()
if err != nil { if err != nil {
logger.Error("failed to submit lockup to Ethereum", zap.Error(err)) logger.Error("failed to submit lockup to Ethereum", zap.Error(err))
break
} }
logger.Info("lockup submitted to Ethereum", zap.Any("tx", tx)) logger.Info("lockup submitted to Ethereum", zap.Any("tx", tx))
default: default:

View File

@ -43,7 +43,8 @@ func NewEthBridgeWatcher(url string, bridge eth_common.Address, minConfirmations
} }
func (e *EthBridgeWatcher) Run(ctx context.Context) error { func (e *EthBridgeWatcher) Run(ctx context.Context) error {
timeout, _ := context.WithTimeout(ctx, 15*time.Second) timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
c, err := ethclient.DialContext(timeout, e.url) c, err := ethclient.DialContext(timeout, e.url)
if err != nil { if err != nil {
return fmt.Errorf("dialing eth client failed: %w", err) return fmt.Errorf("dialing eth client failed: %w", err)
@ -60,7 +61,8 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
} }
// Timeout for initializing subscriptions // Timeout for initializing subscriptions
timeout, _ = context.WithTimeout(ctx, 15*time.Second) timeout, cancel = context.WithTimeout(ctx, 15*time.Second)
defer cancel()
// Subscribe to new token lockups // Subscribe to new token lockups
tokensLockedC := make(chan *abi.AbiLogTokensLocked, 2) tokensLockedC := make(chan *abi.AbiLogTokensLocked, 2)
@ -94,8 +96,9 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
return return
case ev := <-tokensLockedC: case ev := <-tokensLockedC:
// Request timestamp for block // Request timestamp for block
timeout, _ = context.WithTimeout(ctx, 15*time.Second) timeout, cancel = context.WithTimeout(ctx, 15*time.Second)
b, err := c.BlockByNumber(timeout, big.NewInt(int64(ev.Raw.BlockNumber))) b, err := c.BlockByNumber(timeout, big.NewInt(int64(ev.Raw.BlockNumber)))
cancel()
if err != nil { if err != nil {
errC <- fmt.Errorf("failed to request timestamp for block %d: %w", ev.Raw.BlockNumber, err) errC <- fmt.Errorf("failed to request timestamp for block %d: %w", ev.Raw.BlockNumber, err)
return return
@ -192,7 +195,8 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
supervisor.Signal(ctx, supervisor.SignalHealthy) supervisor.Signal(ctx, supervisor.SignalHealthy)
// Fetch current guardian set // Fetch current guardian set
timeout, _ = context.WithTimeout(ctx, 15*time.Second) timeout, cancel = context.WithTimeout(ctx, 15*time.Second)
defer cancel()
opts := &bind.CallOpts{Context: timeout} opts := &bind.CallOpts{Context: timeout}
currentIndex, err := caller.GuardianSetIndex(opts) currentIndex, err := caller.GuardianSetIndex(opts)

View File

@ -43,7 +43,8 @@ func padAddress(address eth_common.Address) vaa.Address {
} }
func (e *SolanaBridgeWatcher) Run(ctx context.Context) error { func (e *SolanaBridgeWatcher) Run(ctx context.Context) error {
timeout, _ := context.WithTimeout(ctx, 15*time.Second) timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
conn, err := grpc.DialContext(timeout, e.url, grpc.WithBlock(), grpc.WithInsecure()) conn, err := grpc.DialContext(timeout, e.url, grpc.WithBlock(), grpc.WithInsecure())
if err != nil { if err != nil {
return fmt.Errorf("failed to dial agent at %s: %w", e.url, err) return fmt.Errorf("failed to dial agent at %s: %w", e.url, err)
@ -113,8 +114,9 @@ func (e *SolanaBridgeWatcher) Run(ctx context.Context) error {
} }
h := hex.EncodeToString(m.Bytes()) h := hex.EncodeToString(m.Bytes())
timeout, _ := context.WithTimeout(ctx, 15*time.Second) timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
res, err := c.SubmitVAA(timeout, &agentv1.SubmitVAARequest{Vaa: vaaBytes}) res, err := c.SubmitVAA(timeout, &agentv1.SubmitVAARequest{Vaa: vaaBytes})
cancel()
if err != nil { if err != nil {
logger.Error("failed to submit VAA", zap.Error(err), zap.String("digest", h)) logger.Error("failed to submit VAA", zap.Error(err), zap.String("digest", h))
break break