Node/Watcher: make pyth ws restart on all errors
Change-Id: I9956876d4fa8f9fc4614d1e9e294a252cd3b7610
This commit is contained in:
parent
1658dbffec
commit
f6825e242e
|
@ -4,7 +4,6 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -246,33 +245,12 @@ func (s *SolanaWatcher) SetupWebSocket(ctx context.Context) error {
|
|||
case <-ctx.Done():
|
||||
return nil
|
||||
default:
|
||||
if err := func() error {
|
||||
|
||||
rCtx, cancel := context.WithTimeout(ctx, time.Second*300) // 5 minute
|
||||
defer cancel()
|
||||
|
||||
if _, msg, err := ws.Read(rCtx); err != nil {
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
// When a websocket context times out, it closes the websocket... This means we have to re-subscribe
|
||||
ws.Close(websocket.StatusNormalClosure, "")
|
||||
err, ws = s.SetupSubscription(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if msg, err := s.readWebSocketWithTimeout(ctx, ws); err != nil {
|
||||
logger.Error(fmt.Sprintf("ReadMessage: '%s'", err.Error()))
|
||||
if errors.Is(err, io.EOF) {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
s.pumpData <- msg
|
||||
}
|
||||
return nil
|
||||
}(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
@ -280,6 +258,13 @@ func (s *SolanaWatcher) SetupWebSocket(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *SolanaWatcher) readWebSocketWithTimeout(ctx context.Context, ws *websocket.Conn) ([]byte, error) {
|
||||
rCtx, cancel := context.WithTimeout(ctx, time.Second*300) // 5 minute
|
||||
defer cancel()
|
||||
_, msg, err := ws.Read(rCtx)
|
||||
return msg, err
|
||||
}
|
||||
|
||||
func (s *SolanaWatcher) Run(ctx context.Context) error {
|
||||
// Initialize gossip metrics (we want to broadcast the address even if we're not yet syncing)
|
||||
contractAddr := base58.Encode(s.contract[:])
|
||||
|
|
Loading…
Reference in New Issue