Node/CCQ/Server: Clean restart (#3598)

* Node/CCQ/Server: Clean restart

* Code review rework

* Code review rework
This commit is contained in:
bruce-riley 2023-12-11 16:00:46 -06:00 committed by GitHub
parent fd05cb0a48
commit 7a2a19c31b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 60 additions and 7 deletions

View File

@ -56,6 +56,8 @@ spec:
- --bootstrap
- /dns4/guardian-0.guardian/udp/8996/quic/p2p/12D3KooWL3XJ9EMCyZvmmGXL2LMiVBtrVa2BuESsJiXkSj7333Jw
- --logLevel=info
- --shutdownDelay1
- "0"
ports:
- containerPort: 6069
name: rest

View File

@ -198,6 +198,7 @@ func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, boot
)
default:
logger.Error("failed to write query response to channel, dropping it", zap.String("peerId", peerId), zap.Any("requestId", requestSignature))
// Leave the request in the pending map. It will get cleaned up if it times out.
}
} else {
logger.Info("waiting for more query responses",

View File

@ -59,3 +59,9 @@ func (p *PendingResponses) Remove(r *PendingResponse) {
defer p.mu.Unlock()
delete(p.pendingResponses, signature)
}
func (p *PendingResponses) NumPending() int {
p.mu.Lock()
defer p.mu.Unlock()
return len(p.pendingResponses)
}

View File

@ -11,6 +11,7 @@ import (
"os"
"os/signal"
"syscall"
"time"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/telemetry"
@ -41,6 +42,8 @@ var (
telemetryNodeName *string
statusAddr *string
promRemoteURL *string
shutdownDelay1 *uint
shutdownDelay2 *uint
)
const DEV_NETWORK_ID = "/wormhole/dev"
@ -61,6 +64,12 @@ func init() {
telemetryNodeName = QueryServerCmd.Flags().String("telemetryNodeName", "", "Node name used in telemetry")
statusAddr = QueryServerCmd.Flags().String("statusAddr", "[::]:6060", "Listen address for status server (disabled if blank)")
promRemoteURL = QueryServerCmd.Flags().String("promRemoteURL", "", "Prometheus remote write URL (Grafana)")
// The default health check monitoring is every five seconds, with a five second timeout, and you have to miss two, for 20 seconds total.
shutdownDelay1 = QueryServerCmd.Flags().Uint("shutdownDelay1", 25, "Seconds to delay after disabling health check on shutdown")
// The guardians will wait up to 60 seconds before giving up on a request.
shutdownDelay2 = QueryServerCmd.Flags().Uint("shutdownDelay2", 65, "Seconds to wait after delay1 for pending requests to complete")
}
var QueryServerCmd = &cobra.Command{
@ -175,11 +184,12 @@ func runQueryServer(cmd *cobra.Command, args []string) {
}()
// Start the status server
var statServer *statusServer
if *statusAddr != "" {
statServer = NewStatusServer(*statusAddr, logger, env)
go func() {
ss := NewStatusServer(*statusAddr, logger, env)
logger.Sugar().Infof("Status server listening on %s", *statusAddr)
err := ss.ListenAndServe()
err := statServer.httpServer.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
logger.Fatal("Status server closed unexpectedly", zap.Error(err))
}
@ -209,7 +219,27 @@ func runQueryServer(cmd *cobra.Command, args []string) {
signal.Notify(sigterm, syscall.SIGTERM)
go func() {
<-sigterm
logger.Info("Received sigterm. exiting.")
if statServer != nil && *shutdownDelay1 != 0 {
logger.Info("Received sigterm. disabling health checks and pausing.")
statServer.disableHealth()
time.Sleep(time.Duration(*shutdownDelay1) * time.Second)
numPending := 0
logger.Info("Waiting for any outstanding requests to complete before shutting down.")
for count := 0; count < int(*shutdownDelay2); count++ {
time.Sleep(time.Second)
numPending = pendingResponses.NumPending()
if numPending == 0 {
break
}
}
if numPending == 0 {
logger.Info("Done waiting. shutting down.")
} else {
logger.Error("Gave up waiting for pending requests to finish. shutting down anyway.", zap.Int("numStillPending", numPending))
}
} else {
logger.Info("Received sigterm. exiting.")
}
cancel()
}()

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
"sync/atomic"
"time"
"github.com/certusone/wormhole/node/pkg/common"
@ -14,26 +15,39 @@ import (
)
type statusServer struct {
logger *zap.Logger
env common.Environment
logger *zap.Logger
env common.Environment
httpServer *http.Server
healthEnabled atomic.Bool
}
func NewStatusServer(addr string, logger *zap.Logger, env common.Environment) *http.Server {
func NewStatusServer(addr string, logger *zap.Logger, env common.Environment) *statusServer {
s := &statusServer{
logger: logger,
env: env,
}
s.healthEnabled.Store(true)
r := mux.NewRouter()
r.HandleFunc("/health", s.handleHealth).Methods("GET")
r.Handle("/metrics", promhttp.Handler())
return &http.Server{
s.httpServer = &http.Server{
Addr: addr,
Handler: r,
ReadHeaderTimeout: 5 * time.Second,
}
return s
}
func (s *statusServer) disableHealth() {
s.healthEnabled.Store(false)
}
func (s *statusServer) handleHealth(w http.ResponseWriter, r *http.Request) {
if !s.healthEnabled.Load() {
s.logger.Info("ignoring health check")
http.Error(w, "shutting down", http.StatusServiceUnavailable)
return
}
s.logger.Debug("health check")
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "ok")