node: switch to use recovery and cleanly restart watcher

This commit is contained in:
Josh Siegel 2023-01-02 15:53:51 +00:00 committed by jumpsiegel
parent 5d8072e3da
commit 4ddeca4dbd
4 changed files with 180 additions and 42 deletions

View File

@ -1136,7 +1136,7 @@ func runNode(cmd *cobra.Command, args []string) {
readiness.RegisterComponent(common.ReadinessNearSyncing)
chainObsvReqC[vaa.ChainIDNear] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "nearwatch",
near.NewWatcher(*nearRPC, *nearContract, lockC, chainObsvReqC[vaa.ChainIDNear], !(*unsafeDevMode || *testnetMode)).Run); err != nil {
common.WrapWithScissors(near.NewWatcher(*nearRPC, *nearContract, lockC, chainObsvReqC[vaa.ChainIDNear], !(*unsafeDevMode || *testnetMode)).Run)); err != nil {
return err
}
}

View File

@ -0,0 +1,66 @@
package common
import (
"context"
"fmt"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
ScissorsErrors = promauto.NewCounter(
prometheus.CounterOpts{
Name: "scissor_errors_caught",
Help: "Total number of unhandled errors caught",
})
)
// Start a go routine with recovering from any panic by sending an error to a error channel
func RunWithScissors(ctx context.Context, errC chan error, name string, runnable supervisor.Runnable) {
go func() {
defer func() {
if r := recover(); r != nil {
switch x := r.(type) {
case error:
errC <- fmt.Errorf("%s: %w", name, x)
default:
errC <- fmt.Errorf("%s: %v", name, x)
}
ScissorsErrors.Inc()
}
}()
err := runnable(ctx)
if err != nil {
errC <- err
}
}()
}
type (
Scissors struct {
runnable supervisor.Runnable
}
)
func WrapWithScissors(runnable supervisor.Runnable) supervisor.Runnable {
s := Scissors{runnable: runnable}
return s.Run
}
func (e *Scissors) Run(ctx context.Context) (result error) {
defer func() {
if r := recover(); r != nil {
switch x := r.(type) {
case error:
result = x
default:
result = fmt.Errorf("%v", x)
}
ScissorsErrors.Inc()
}
}()
return e.runnable(ctx)
}

View File

@ -0,0 +1,80 @@
package common
import (
"context"
"errors"
"fmt"
"testing"
"github.com/stretchr/testify/assert"
)
func throwNil(ctx context.Context) error {
var x *int = nil
*x = 5
return nil
}
func runTest(t *testing.T, ctx context.Context, testCase int) (result error) {
t.Helper()
defer func() {
if r := recover(); r != nil {
switch x := r.(type) {
case string:
result = errors.New(x)
case error:
result = x
default:
result = fmt.Errorf("unknown panic in runTest/%d", testCase)
}
}
}()
errC := make(chan error)
switch testCase {
case 0:
_ = throwNil(ctx) // fall into defer above
case 1:
RunWithScissors(ctx, errC, "test1Thread", throwNil)
case 2:
RunWithScissors(ctx, errC, "test2Thread", func(ctx context.Context) error {
<-ctx.Done()
return nil
})
_ = throwNil(ctx)
case 3:
go func() { _ = throwNil(ctx) }() // uncatchable
}
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errC:
return err
}
}
func TestSupervisor(t *testing.T) {
for i := 0; i < 3; i++ {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
rootCtx := context.Background()
ctx, fn := context.WithCancel(rootCtx)
err := runTest(t, ctx, i)
switch i {
case 0:
assert.EqualError(t, err, "runtime error: invalid memory address or nil pointer dereference")
case 1:
assert.EqualError(t, err, "test1Thread: runtime error: invalid memory address or nil pointer dereference")
case 2:
assert.EqualError(t, err, "runtime error: invalid memory address or nil pointer dereference")
}
fn()
},
)
}
}

View File

@ -79,6 +79,9 @@ type (
eventChanTxProcessedDuration chan time.Duration
eventChan chan eventType // whenever a messages is confirmed, post true in here
// Error channel
errC chan error
// sub-components
finalizer Finalizer
nearAPI nearapi.NearApi
@ -129,8 +132,6 @@ func (e *Watcher) runBlockPoll(ctx context.Context) error {
highestFinalBlockHeightObserved := finalBlock.Header.Height - 1 // minues one because we still want to process this block, just no blocks before it
supervisor.Signal(ctx, supervisor.SignalHealthy)
timer := time.NewTimer(time.Nanosecond) // this is just for the first iteration.
for {
@ -180,8 +181,6 @@ func (e *Watcher) runChunkFetcher(ctx context.Context) error {
func (e *Watcher) runObsvReqProcessor(ctx context.Context) error {
logger := supervisor.Logger(ctx)
supervisor.Signal(ctx, supervisor.SignalHealthy)
for {
select {
case <-ctx.Done():
@ -207,7 +206,7 @@ func (e *Watcher) runObsvReqProcessor(ctx context.Context) error {
func (e *Watcher) runTxProcessor(ctx context.Context) error {
logger := supervisor.Logger(ctx)
supervisor.Signal(ctx, supervisor.SignalHealthy)
for {
select {
case <-ctx.Done():
@ -252,6 +251,8 @@ func (e *Watcher) runTxProcessor(ctx context.Context) error {
func (e *Watcher) Run(ctx context.Context) error {
logger := supervisor.Logger(ctx)
e.errC = make(chan error)
e.nearAPI = nearapi.NewNearApiImpl(nearapi.NewHttpNearRpc(e.nearRPC))
e.finalizer = newFinalizer(e.eventChan, e.nearAPI, e.mainnet)
@ -262,61 +263,52 @@ func (e *Watcher) Run(ctx context.Context) error {
logger.Info("Near watcher connecting to RPC node ", zap.String("url", e.nearRPC))
// start metrics reporter
err := supervisor.Run(ctx, "metrics", e.runMetrics)
if err != nil {
return err
}
common.RunWithScissors(ctx, e.errC, "metrics", e.runMetrics)
// start one poller
err = supervisor.Run(ctx, "blockPoll", e.runBlockPoll)
if err != nil {
return err
}
common.RunWithScissors(ctx, e.errC, "blockPoll", e.runBlockPoll)
// start one obsvReqC runner
err = supervisor.Run(ctx, "obsvReqProcessor", e.runObsvReqProcessor)
if err != nil {
return err
}
common.RunWithScissors(ctx, e.errC, "obsvReqProcessor", e.runObsvReqProcessor)
// start `workerCount` many chunkFetcher runners
for i := 0; i < workerChunkFetching; i++ {
err := supervisor.Run(ctx, fmt.Sprintf("chunk_fetcher_%d", i), e.runChunkFetcher)
if err != nil {
return err
}
common.RunWithScissors(ctx, e.errC, fmt.Sprintf("chunk_fetcher_%d", i), e.runChunkFetcher)
}
// start `workerCount` many transactionProcessing runners
for i := 0; i < workerCountTxProcessing; i++ {
err := supervisor.Run(ctx, fmt.Sprintf("txProcessor_%d", i), e.runTxProcessor)
if err != nil {
return err
}
common.RunWithScissors(ctx, e.errC, fmt.Sprintf("txProcessor_%d", i), e.runTxProcessor)
}
supervisor.Signal(ctx, supervisor.SignalHealthy)
<-ctx.Done()
return ctx.Err()
select {
case <-ctx.Done():
return ctx.Err()
case err := <-e.errC:
return err
}
}
// schedule pushes a job to workers after delay. It is context aware and will not execute the job if the context
// is cancelled before delay has passed and the job is picked up by a worker.
func (e *Watcher) schedule(ctx context.Context, job *transactionProcessingJob, delay time.Duration) {
go func() {
timer := time.NewTimer(delay)
defer timer.Stop()
common.RunWithScissors(ctx, e.errC, "scheduledThread",
func(ctx context.Context) error {
timer := time.NewTimer(delay)
defer timer.Stop()
e.transactionProcessingQueueCounter.Add(1)
defer e.transactionProcessingQueueCounter.Add(-1)
e.transactionProcessingQueueCounter.Add(1)
defer e.transactionProcessingQueueCounter.Add(-1)
select {
case <-ctx.Done():
return
case <-timer.C:
// Don't block on processing if the context is cancelled
select {
case <-ctx.Done():
return
case e.transactionProcessingQueue <- job:
return nil
case <-timer.C:
// Don't block on processing if the context is cancelled
select {
case <-ctx.Done():
return nil
case e.transactionProcessingQueue <- job:
}
}
}
}()
return nil
})
}