diff --git a/node/pkg/watchers/solana/client.go b/node/pkg/watchers/solana/client.go index 322908598..45b9882ac 100644 --- a/node/pkg/watchers/solana/client.go +++ b/node/pkg/watchers/solana/client.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "io" "sync" "time" @@ -246,32 +245,11 @@ func (s *SolanaWatcher) SetupWebSocket(ctx context.Context) error { case <-ctx.Done(): return nil default: - if err := func() error { - - rCtx, cancel := context.WithTimeout(ctx, time.Second*300) // 5 minute - defer cancel() - - if _, msg, err := ws.Read(rCtx); err != nil { - if errors.Is(err, context.DeadlineExceeded) { - // When a websocket context times out, it closes the websocket... This means we have to re-subscribe - ws.Close(websocket.StatusNormalClosure, "") - err, ws = s.SetupSubscription(ctx) - if err != nil { - return err - } - return nil - } - - logger.Error(fmt.Sprintf("ReadMessage: '%s'", err.Error())) - if errors.Is(err, io.EOF) { - return err - } - } else { - s.pumpData <- msg - } - return nil - }(); err != nil { + if msg, err := s.readWebSocketWithTimeout(ctx, ws); err != nil { + logger.Error(fmt.Sprintf("ReadMessage: '%s'", err.Error())) return err + } else { + s.pumpData <- msg } } } @@ -280,6 +258,13 @@ func (s *SolanaWatcher) SetupWebSocket(ctx context.Context) error { return nil } +func (s *SolanaWatcher) readWebSocketWithTimeout(ctx context.Context, ws *websocket.Conn) ([]byte, error) { + rCtx, cancel := context.WithTimeout(ctx, time.Second*300) // 5 minute + defer cancel() + _, msg, err := ws.Read(rCtx) + return msg, err +} + func (s *SolanaWatcher) Run(ctx context.Context) error { // Initialize gossip metrics (we want to broadcast the address even if we're not yet syncing) contractAddr := base58.Encode(s.contract[:])