diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index 45545a2a9..7c910b9d7 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -1136,7 +1136,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessNearSyncing) chainObsvReqC[vaa.ChainIDNear] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "nearwatch", - near.NewWatcher(*nearRPC, *nearContract, lockC, chainObsvReqC[vaa.ChainIDNear], !(*unsafeDevMode || *testnetMode)).Run); err != nil { + common.WrapWithScissors(near.NewWatcher(*nearRPC, *nearContract, lockC, chainObsvReqC[vaa.ChainIDNear], !(*unsafeDevMode || *testnetMode)).Run)); err != nil { return err } } diff --git a/node/pkg/common/scissors.go b/node/pkg/common/scissors.go new file mode 100644 index 000000000..71f55b1f1 --- /dev/null +++ b/node/pkg/common/scissors.go @@ -0,0 +1,66 @@ +package common + +import ( + "context" + "fmt" + + "github.com/certusone/wormhole/node/pkg/supervisor" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + ScissorsErrors = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "scissor_errors_caught", + Help: "Total number of unhandled errors caught", + }) +) + +// Start a go routine with recovering from any panic by sending an error to a error channel +func RunWithScissors(ctx context.Context, errC chan error, name string, runnable supervisor.Runnable) { + go func() { + defer func() { + if r := recover(); r != nil { + switch x := r.(type) { + case error: + errC <- fmt.Errorf("%s: %w", name, x) + default: + errC <- fmt.Errorf("%s: %v", name, x) + } + ScissorsErrors.Inc() + } + }() + err := runnable(ctx) + if err != nil { + errC <- err + } + }() +} + +type ( + Scissors struct { + runnable supervisor.Runnable + } +) + +func WrapWithScissors(runnable supervisor.Runnable) supervisor.Runnable { + s := Scissors{runnable: runnable} + return s.Run +} + +func (e *Scissors) Run(ctx context.Context) (result error) { + defer func() { + if r := recover(); r != nil { + switch x := r.(type) { + case error: + result = x + default: + result = fmt.Errorf("%v", x) + } + ScissorsErrors.Inc() + } + }() + + return e.runnable(ctx) +} diff --git a/node/pkg/common/scissors_test.go b/node/pkg/common/scissors_test.go new file mode 100644 index 000000000..b19c198cb --- /dev/null +++ b/node/pkg/common/scissors_test.go @@ -0,0 +1,80 @@ +package common + +import ( + "context" + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func throwNil(ctx context.Context) error { + var x *int = nil + *x = 5 + return nil +} + +func runTest(t *testing.T, ctx context.Context, testCase int) (result error) { + t.Helper() + + defer func() { + if r := recover(); r != nil { + switch x := r.(type) { + case string: + result = errors.New(x) + case error: + result = x + default: + result = fmt.Errorf("unknown panic in runTest/%d", testCase) + } + } + }() + + errC := make(chan error) + + switch testCase { + case 0: + _ = throwNil(ctx) // fall into defer above + case 1: + RunWithScissors(ctx, errC, "test1Thread", throwNil) + case 2: + RunWithScissors(ctx, errC, "test2Thread", func(ctx context.Context) error { + <-ctx.Done() + return nil + }) + _ = throwNil(ctx) + + case 3: + go func() { _ = throwNil(ctx) }() // uncatchable + } + + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-errC: + return err + } +} + +func TestSupervisor(t *testing.T) { + for i := 0; i < 3; i++ { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + rootCtx := context.Background() + ctx, fn := context.WithCancel(rootCtx) + + err := runTest(t, ctx, i) + + switch i { + case 0: + assert.EqualError(t, err, "runtime error: invalid memory address or nil pointer dereference") + case 1: + assert.EqualError(t, err, "test1Thread: runtime error: invalid memory address or nil pointer dereference") + case 2: + assert.EqualError(t, err, "runtime error: invalid memory address or nil pointer dereference") + } + fn() + }, + ) + } +} diff --git a/node/pkg/watchers/near/watcher.go b/node/pkg/watchers/near/watcher.go index 87ee4bc41..8a267a788 100644 --- a/node/pkg/watchers/near/watcher.go +++ b/node/pkg/watchers/near/watcher.go @@ -79,6 +79,9 @@ type ( eventChanTxProcessedDuration chan time.Duration eventChan chan eventType // whenever a messages is confirmed, post true in here + // Error channel + errC chan error + // sub-components finalizer Finalizer nearAPI nearapi.NearApi @@ -129,8 +132,6 @@ func (e *Watcher) runBlockPoll(ctx context.Context) error { highestFinalBlockHeightObserved := finalBlock.Header.Height - 1 // minues one because we still want to process this block, just no blocks before it - supervisor.Signal(ctx, supervisor.SignalHealthy) - timer := time.NewTimer(time.Nanosecond) // this is just for the first iteration. for { @@ -180,8 +181,6 @@ func (e *Watcher) runChunkFetcher(ctx context.Context) error { func (e *Watcher) runObsvReqProcessor(ctx context.Context) error { logger := supervisor.Logger(ctx) - supervisor.Signal(ctx, supervisor.SignalHealthy) - for { select { case <-ctx.Done(): @@ -207,7 +206,7 @@ func (e *Watcher) runObsvReqProcessor(ctx context.Context) error { func (e *Watcher) runTxProcessor(ctx context.Context) error { logger := supervisor.Logger(ctx) - supervisor.Signal(ctx, supervisor.SignalHealthy) + for { select { case <-ctx.Done(): @@ -252,6 +251,8 @@ func (e *Watcher) runTxProcessor(ctx context.Context) error { func (e *Watcher) Run(ctx context.Context) error { logger := supervisor.Logger(ctx) + e.errC = make(chan error) + e.nearAPI = nearapi.NewNearApiImpl(nearapi.NewHttpNearRpc(e.nearRPC)) e.finalizer = newFinalizer(e.eventChan, e.nearAPI, e.mainnet) @@ -262,61 +263,52 @@ func (e *Watcher) Run(ctx context.Context) error { logger.Info("Near watcher connecting to RPC node ", zap.String("url", e.nearRPC)) // start metrics reporter - err := supervisor.Run(ctx, "metrics", e.runMetrics) - if err != nil { - return err - } + common.RunWithScissors(ctx, e.errC, "metrics", e.runMetrics) // start one poller - err = supervisor.Run(ctx, "blockPoll", e.runBlockPoll) - if err != nil { - return err - } + common.RunWithScissors(ctx, e.errC, "blockPoll", e.runBlockPoll) // start one obsvReqC runner - err = supervisor.Run(ctx, "obsvReqProcessor", e.runObsvReqProcessor) - if err != nil { - return err - } + common.RunWithScissors(ctx, e.errC, "obsvReqProcessor", e.runObsvReqProcessor) // start `workerCount` many chunkFetcher runners for i := 0; i < workerChunkFetching; i++ { - err := supervisor.Run(ctx, fmt.Sprintf("chunk_fetcher_%d", i), e.runChunkFetcher) - if err != nil { - return err - } + common.RunWithScissors(ctx, e.errC, fmt.Sprintf("chunk_fetcher_%d", i), e.runChunkFetcher) } // start `workerCount` many transactionProcessing runners for i := 0; i < workerCountTxProcessing; i++ { - err := supervisor.Run(ctx, fmt.Sprintf("txProcessor_%d", i), e.runTxProcessor) - if err != nil { - return err - } + common.RunWithScissors(ctx, e.errC, fmt.Sprintf("txProcessor_%d", i), e.runTxProcessor) } supervisor.Signal(ctx, supervisor.SignalHealthy) - <-ctx.Done() - return ctx.Err() + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-e.errC: + return err + } } // schedule pushes a job to workers after delay. It is context aware and will not execute the job if the context // is cancelled before delay has passed and the job is picked up by a worker. func (e *Watcher) schedule(ctx context.Context, job *transactionProcessingJob, delay time.Duration) { - go func() { - timer := time.NewTimer(delay) - defer timer.Stop() + common.RunWithScissors(ctx, e.errC, "scheduledThread", + func(ctx context.Context) error { + timer := time.NewTimer(delay) + defer timer.Stop() - e.transactionProcessingQueueCounter.Add(1) - defer e.transactionProcessingQueueCounter.Add(-1) + e.transactionProcessingQueueCounter.Add(1) + defer e.transactionProcessingQueueCounter.Add(-1) - select { - case <-ctx.Done(): - return - case <-timer.C: - // Don't block on processing if the context is cancelled select { case <-ctx.Done(): - return - case e.transactionProcessingQueue <- job: + return nil + case <-timer.C: + // Don't block on processing if the context is cancelled + select { + case <-ctx.Done(): + return nil + case e.transactionProcessingQueue <- job: + } } - } - }() + return nil + }) }