From 39eb919aaa55f69c852e1e1a7012b20321f7f1c9 Mon Sep 17 00:00:00 2001 From: Leo Date: Sun, 2 Aug 2020 23:10:53 +0200 Subject: [PATCH] Import supervision tree from internal SignOS repo Once SignOS is public, we can un-vendor this. --- bridge/pkg/supervisor/supervisor.go | 118 ++++ bridge/pkg/supervisor/supervisor_node.go | 267 +++++++++ bridge/pkg/supervisor/supervisor_processor.go | 390 ++++++++++++ bridge/pkg/supervisor/supervisor_support.go | 46 ++ bridge/pkg/supervisor/supervisor_test.go | 553 ++++++++++++++++++ .../pkg/supervisor/supervisor_testhelpers.go | 34 ++ 6 files changed, 1408 insertions(+) create mode 100644 bridge/pkg/supervisor/supervisor.go create mode 100644 bridge/pkg/supervisor/supervisor_node.go create mode 100644 bridge/pkg/supervisor/supervisor_processor.go create mode 100644 bridge/pkg/supervisor/supervisor_support.go create mode 100644 bridge/pkg/supervisor/supervisor_test.go create mode 100644 bridge/pkg/supervisor/supervisor_testhelpers.go diff --git a/bridge/pkg/supervisor/supervisor.go b/bridge/pkg/supervisor/supervisor.go new file mode 100644 index 000000000..49394eedd --- /dev/null +++ b/bridge/pkg/supervisor/supervisor.go @@ -0,0 +1,118 @@ +package supervisor + +// The service supervision library allows for writing of reliable, service-style software within SignOS. +// It builds upon the Erlang/OTP supervision tree system, adapted to be more Go-ish. +// For detailed design see go/supervision. + +import ( + "context" + "sync" + + "go.uber.org/zap" +) + +// A Runnable is a function that will be run in a goroutine, and supervised throughout its lifetime. It can in turn +// start more runnables as its children, and those will form part of a supervision tree. +// The context passed to a runnable is very important and needs to be handled properly. It will be live (non-errored) as +// long as the runnable should be running, and canceled (ctx.Err() will be non-nil) when the supervisor wants it to +// exit. This means this context is also perfectly usable for performing any blocking operations. +type Runnable func(ctx context.Context) error + +// RunGroup starts a set of runnables as a group. These runnables will run together, and if any one of them quits +// unexpectedly, the result will be canceled and restarted. +// The context here must be an existing Runnable context, and the spawned runnables will run under the node that this +// context represents. +func RunGroup(ctx context.Context, runnables map[string]Runnable) error { + node, unlock := fromContext(ctx) + defer unlock() + return node.runGroup(runnables) +} + +// Run starts a single runnable in its own group. +func Run(ctx context.Context, name string, runnable Runnable) error { + return RunGroup(ctx, map[string]Runnable{ + name: runnable, + }) +} + +// Signal tells the supervisor that the calling runnable has reached a certain state of its lifecycle. All runnables +// should SignalHealthy when they are ready with set up, running other child runnables and are now 'serving'. +func Signal(ctx context.Context, signal SignalType) { + node, unlock := fromContext(ctx) + defer unlock() + node.signal(signal) +} + +type SignalType int + +const ( + // The runnable is healthy, done with setup, done with spawning more Runnables, and ready to serve in a loop. + // The runnable needs to check the parent context and ensure that if that context is done, the runnable exits. + SignalHealthy SignalType = iota + // The runnable is done - it does not need to run any loop. This is useful for Runnables that only set up other + // child runnables. This runnable will be restarted if a related failure happens somewhere in the supervision tree. + SignalDone +) + +// Logger returns a Zap logger that will be named after the Distinguished Name of a the runnable (ie its place in the +// supervision tree, dot-separated). +func Logger(ctx context.Context) *zap.Logger { + node, unlock := fromContext(ctx) + defer unlock() + return node.getLogger() +} + +// supervisor represents and instance of the supervision system. It keeps track of a supervision tree and a request +// channel to its internal processor goroutine. +type supervisor struct { + // mu guards the entire state of the supervisor. + mu sync.RWMutex + // root is the root node of the supervision tree, named 'root'. It represents the Runnable started with the + // supervisor.New call. + root *node + // logger is the Zap logger used to create loggers available to runnables. + logger *zap.Logger + // ilogger is the Zap logger used for internal logging by the supervisor. + ilogger *zap.Logger + + // pReq is an interface channel to the lifecycle processor of the supervisor. + pReq chan *processorRequest + + // propagate panics, ie. don't catch them. + propagatePanic bool +} + +// SupervisorOpt are runtime configurable options for the supervisor. +type SupervisorOpt func(s *supervisor) + +var ( + // WithPropagatePanic prevents the Supervisor from catching panics in runnables and treating them as failures. + // This is useful to enable for testing and local debugging. + WithPropagatePanic = func(s *supervisor) { + s.propagatePanic = true + } +) + +// New creates a new supervisor with its root running the given root runnable. +// The given context can be used to cancel the entire supervision tree. +func New(ctx context.Context, logger *zap.Logger, rootRunnable Runnable, opts ...SupervisorOpt) *supervisor { + sup := &supervisor{ + logger: logger, + ilogger: logger.Named("supervisor"), + pReq: make(chan *processorRequest), + } + + for _, o := range opts { + o(sup) + } + + sup.root = newNode("root", rootRunnable, sup, nil) + + go sup.processor(ctx) + + sup.pReq <- &processorRequest{ + schedule: &processorRequestSchedule{dn: "root"}, + } + + return sup +} diff --git a/bridge/pkg/supervisor/supervisor_node.go b/bridge/pkg/supervisor/supervisor_node.go new file mode 100644 index 000000000..80dc302d9 --- /dev/null +++ b/bridge/pkg/supervisor/supervisor_node.go @@ -0,0 +1,267 @@ +package supervisor + +import ( + "context" + "fmt" + "regexp" + "strings" + + "github.com/cenkalti/backoff/v4" + "go.uber.org/zap" +) + +// node is a supervision tree node. It represents the state of a Runnable within this tree, its relation to other tree +// elements, and contains supporting data needed to actually supervise it. +type node struct { + // The name of this node. Opaque string. It's used to make up the 'dn' (distinguished name) of a node within + // the tree. When starting a runnable inside a tree, this is where that name gets used. + name string + runnable Runnable + + // The supervisor managing this tree. + sup *supervisor + // The parent, within the tree, of this node. If this is the root node of the tree, this is nil. + parent *node + // Children of this tree. This is represented by a map keyed from child node names, for easy access. + children map[string]*node + // Supervision groups. Each group is a set of names of children. Sets, and as such groups, don't overlap between + // each other. A supervision group indicates that if any child within that group fails, all others should be + // canceled and restarted together. + groups []map[string]bool + + // The current state of the runnable in this node. + state nodeState + + // Backoff used to keep runnables from being restarted too fast. + bo *backoff.ExponentialBackOff + + // Context passed to the runnable, and its cancel function. + ctx context.Context + ctxC context.CancelFunc +} + +// nodeState is the state of a runnable within a node, and in a way the node itself. +// This follows the state diagram from go/supervision. +type nodeState int + +const ( + // A node that has just been created, and whose runnable has been started already but hasn't signaled anything yet. + nodeStateNew nodeState = iota + // A node whose runnable has signaled being healthy - this means it's ready to serve/act. + nodeStateHealthy + // A node that has unexpectedly returned or panicked. + nodeStateDead + // A node that has declared that its done with its work and should not be restarted, unless a supervision tree + // failure requires that. + nodeStateDone + // A node that has returned after being requested to cancel. + nodeStateCanceled +) + +func (s nodeState) String() string { + switch s { + case nodeStateNew: + return "NODE_STATE_NEW" + case nodeStateHealthy: + return "NODE_STATE_HEALTHY" + case nodeStateDead: + return "NODE_STATE_DEAD" + case nodeStateDone: + return "NODE_STATE_DONE" + case nodeStateCanceled: + return "NODE_STATE_CANCELED" + } + return "UNKNOWN" +} + +func (n *node) String() string { + return fmt.Sprintf("%s (%s)", n.dn(), n.state.String()) +} + +// contextKey is a type used to keep data within context values. +type contextKey string + +var ( + supervisorKey = contextKey("supervisor") + dnKey = contextKey("dn") +) + +// fromContext retrieves a tree node from a runnable context. It takes a lock on the tree and returns an unlock +// function. This unlock function needs to be called once mutations on the tree/supervisor/node are done. +func fromContext(ctx context.Context) (*node, func()) { + sup, ok := ctx.Value(supervisorKey).(*supervisor) + if !ok { + panic("supervisor function called from non-runnable context") + } + + sup.mu.Lock() + + dnParent, ok := ctx.Value(dnKey).(string) + if !ok { + sup.mu.Unlock() + panic("supervisor function called from non-runnable context") + } + + return sup.nodeByDN(dnParent), sup.mu.Unlock +} + +// All the following 'internal' supervisor functions must only be called with the supervisor lock taken. Getting a lock +// via fromContext is enough. + +// dn returns the distinguished name of a node. This distinguished name is a period-separated, inverse-DNS-like name. +// For instance, the runnable 'foo' within the runnable 'bar' will be called 'root.bar.foo'. The root of the tree is +// always named, and has the dn, 'root'. +func (n *node) dn() string { + if n.parent != nil { + return fmt.Sprintf("%s.%s", n.parent.dn(), n.name) + } + return n.name +} + +// groupSiblings is a helper function to get all runnable group siblings of a given runnable name within this node. +// All children are always in a group, even if that group is unary. +func (n *node) groupSiblings(name string) map[string]bool { + for _, m := range n.groups { + if _, ok := m[name]; ok { + return m + } + } + return nil +} + +// newNode creates a new node with a given parent. It does not register it with the parent (as that depends on group +// placement). +func newNode(name string, runnable Runnable, sup *supervisor, parent *node) *node { + n := &node{ + name: name, + runnable: runnable, + + bo: backoff.NewExponentialBackOff(), + + sup: sup, + parent: parent, + } + n.reset() + return n +} + +// resetNode sets up all the dynamic fields of the node, in preparation of starting a runnable. It clears the node's +// children, groups and resets its context. +func (n *node) reset() { + // Make new context. First, acquire parent context. For the root node that's Background, otherwise it's the + // parent's context. + var pCtx context.Context + if n.parent == nil { + pCtx = context.Background() + } else { + pCtx = n.parent.ctx + } + // Mark DN and supervisor in context. + ctx := context.WithValue(pCtx, dnKey, n.dn()) + ctx = context.WithValue(ctx, supervisorKey, n.sup) + ctx, ctxC := context.WithCancel(ctx) + // Set context + n.ctx = ctx + n.ctxC = ctxC + + // Clear children and state + n.state = nodeStateNew + n.children = make(map[string]*node) + n.groups = nil + + // The node is now ready to be scheduled. +} + +// nodeByDN returns a node by given DN from the supervisor. +func (s *supervisor) nodeByDN(dn string) *node { + parts := strings.Split(dn, ".") + if parts[0] != "root" { + panic("DN does not start with root.") + } + parts = parts[1:] + cur := s.root + for { + if len(parts) == 0 { + return cur + } + + next, ok := cur.children[parts[0]] + if !ok { + panic(fmt.Errorf("could not find %v (%s) in %s", parts, dn, cur)) + } + cur = next + parts = parts[1:] + } +} + +// reNodeName validates a node name against constraints. +var reNodeName = regexp.MustCompile(`[a-z90-9_]{1,64}`) + +// runGroup schedules a new group of runnables to run on a node. +func (n *node) runGroup(runnables map[string]Runnable) error { + // Check that the parent node is in the right state. + if n.state != nodeStateNew { + return fmt.Errorf("cannot run new runnable on non-NEW node") + } + + // Check the requested runnable names. + for name, _ := range runnables { + if !reNodeName.MatchString(name) { + return fmt.Errorf("runnable name %q is invalid", name) + } + if _, ok := n.children[name]; ok { + return fmt.Errorf("runnable %q already exists", name) + } + } + + // Create child nodes. + dns := make(map[string]string) + group := make(map[string]bool) + for name, runnable := range runnables { + if g := n.groupSiblings(name); g != nil { + return fmt.Errorf("duplicate child name %q", name) + } + node := newNode(name, runnable, n.sup, n) + n.children[name] = node + + dns[name] = node.dn() + group[name] = true + } + // Add group. + n.groups = append(n.groups, group) + + // Schedule execution of group members. + go func() { + for name, _ := range runnables { + n.sup.pReq <- &processorRequest{ + schedule: &processorRequestSchedule{ + dn: dns[name], + }, + } + } + }() + return nil +} + +// signal sequences state changes by signals received from runnables and updates a node's status accordingly. +func (n *node) signal(signal SignalType) { + switch signal { + case SignalHealthy: + if n.state != nodeStateNew { + panic(fmt.Errorf("node %s signaled healthy", n)) + } + n.state = nodeStateHealthy + n.bo.Reset() + case SignalDone: + if n.state != nodeStateHealthy { + panic(fmt.Errorf("node %s signaled done", n)) + } + n.state = nodeStateDone + n.bo.Reset() + } +} + +// getLogger creates a new logger for a given supervisor node, to be used by its runnable. +func (n *node) getLogger() *zap.Logger { + return n.sup.logger.Named(n.dn()) +} diff --git a/bridge/pkg/supervisor/supervisor_processor.go b/bridge/pkg/supervisor/supervisor_processor.go new file mode 100644 index 000000000..309d44a29 --- /dev/null +++ b/bridge/pkg/supervisor/supervisor_processor.go @@ -0,0 +1,390 @@ +package supervisor + +import ( + "context" + "errors" + "fmt" + "runtime/debug" + "time" + + "go.uber.org/zap" +) + +// The processor maintains runnable goroutines - ie., when requested will start one, and then once it exists it will +// record the result and act accordingly. It is also responsible for detecting and acting upon supervision subtrees that +// need to be restarted after death (via a 'GC' process) + +// processorRequest is a request for the processor. Only one of the fields can be set. +type processorRequest struct { + schedule *processorRequestSchedule + died *processorRequestDied + waitSettled *processorRequestWaitSettled +} + +// processorRequestSchedule requests that a given node's runnable be started. +type processorRequestSchedule struct { + dn string +} + +// processorRequestDied is a signal from a runnable goroutine that the runnable has died. +type processorRequestDied struct { + dn string + err error +} + +type processorRequestWaitSettled struct { + waiter chan struct{} +} + +// processor is the main processing loop. +func (s *supervisor) processor(ctx context.Context) { + s.ilogger.Info("supervisor processor started") + + // Waiters waiting for the GC to be settled. + var waiters []chan struct{} + + // The GC will run every millisecond if needed. Any time the processor requests a change in the supervision tree + // (ie a death or a new runnable) it will mark the state as dirty and run the GC on the next millisecond cycle. + gc := time.NewTicker(1 * time.Millisecond) + defer gc.Stop() + clean := true + + // How long has the GC been clean. This is used to notify 'settled' waiters. + cleanCycles := 0 + + markDirty := func() { + clean = false + cleanCycles = 0 + } + + for { + select { + case <-ctx.Done(): + s.ilogger.Info("supervisor processor exiting...", zap.Error(ctx.Err())) + s.processKill() + s.ilogger.Info("supervisor exited") + return + case <-gc.C: + if !clean { + s.processGC() + } + clean = true + cleanCycles += 1 + + // This threshold is somewhat arbitrary. It's a balance between test speed and test reliability. + if cleanCycles > 50 { + for _, w := range waiters { + close(w) + } + waiters = nil + } + case r := <-s.pReq: + switch { + case r.schedule != nil: + s.processSchedule(r.schedule) + markDirty() + case r.died != nil: + s.processDied(r.died) + markDirty() + case r.waitSettled != nil: + waiters = append(waiters, r.waitSettled.waiter) + default: + panic(fmt.Errorf("unhandled request %+v", r)) + } + } + } +} + +// processKill cancels all nodes in the supervision tree. This is only called right before exiting the processor, so +// they do not get automatically restarted. +func (s *supervisor) processKill() { + s.mu.Lock() + defer s.mu.Unlock() + + // Gather all context cancel functions. + var cancels []func() + queue := []*node{s.root} + for { + if len(queue) == 0 { + break + } + + cur := queue[0] + queue = queue[1:] + + cancels = append(cancels, cur.ctxC) + for _, c := range cur.children { + queue = append(queue, c) + } + } + + // Call all context cancels. + for _, c := range cancels { + c() + } +} + +// processSchedule starts a node's runnable in a goroutine and records its output once it's done. +func (s *supervisor) processSchedule(r *processorRequestSchedule) { + s.mu.Lock() + defer s.mu.Unlock() + + n := s.nodeByDN(r.dn) + go func() { + if !s.propagatePanic { + defer func() { + if rec := recover(); rec != nil { + s.pReq <- &processorRequest{ + died: &processorRequestDied{ + dn: r.dn, + err: fmt.Errorf("panic: %v, stacktrace: %s", rec, string(debug.Stack())), + }, + } + } + }() + } + + res := n.runnable(n.ctx) + + s.pReq <- &processorRequest{ + died: &processorRequestDied{ + dn: r.dn, + err: res, + }, + } + }() +} + +// processDied records the result from a runnable goroutine, and updates its node state accordingly. If the result +// is a death and not an expected exit, related nodes (ie. children and group siblings) are canceled accordingly. +func (s *supervisor) processDied(r *processorRequestDied) { + s.mu.Lock() + defer s.mu.Unlock() + + // Okay, so a Runnable has quit. What now? + n := s.nodeByDN(r.dn) + ctx := n.ctx + + // Simple case: it was marked as Done and quit with no error. + if n.state == nodeStateDone && r.err == nil { + // Do nothing. This was supposed to happen. Keep the process as DONE. + return + } + + // Find innermost error to check if it's a context canceled error. + perr := r.err + for { + if inner := errors.Unwrap(perr); inner != nil { + perr = inner + continue + } + break + } + + // Simple case: the context was canceled and the returned error is the context error. + if err := ctx.Err(); err != nil && perr == err { + // Mark the node as canceled successfully. + n.state = nodeStateCanceled + return + } + + // Otherwise, the Runnable should not have died or quit. Handle accordingly. + err := r.err + // A lack of returned error is also an error. + if err == nil { + err = fmt.Errorf("returned when %s", n.state) + } else { + err = fmt.Errorf("returned error when %s: %w", n.state, err) + } + + s.ilogger.Error("Runnable died", zap.String("dn", n.dn()), zap.Error(err)) + // Mark as dead. + n.state = nodeStateDead + + // Cancel that node's context, just in case something still depends on it. + n.ctxC() + + // Cancel all siblings. + if n.parent != nil { + for name, _ := range n.parent.groupSiblings(n.name) { + if name == n.name { + continue + } + sibling := n.parent.children[name] + // TODO(q3k): does this need to run in a goroutine, ie. can a context cancel block? + sibling.ctxC() + } + } +} + +// processGC runs the GC process. It's not really Garbage Collection, as in, it doesn't remove unnecessary tree nodes - +// but it does find nodes that need to be restarted, find the subset that can and then schedules them for running. +// As such, it's less of a Garbage Collector and more of a Necromancer. However, GC is a friendlier name. +func (s *supervisor) processGC() { + s.mu.Lock() + defer s.mu.Unlock() + + // The 'GC' serves is the main business logic of the supervision tree. It traverses a locked tree and tries to + // find subtrees that must be restarted (because of a DEAD/CANCELED runnable). It then finds which of these + // subtrees that should be restarted can be restarted, ie. which ones are fully recursively DEAD/CANCELED. It + // also finds the smallest set of largest subtrees that can be restarted, ie. if there's multiple DEAD runnables + // that can be restarted at once, it will do so. + + // Phase one: Find all leaves. + // This is a simple DFS that finds all the leaves of the tree, ie all nodes that do not have children nodes. + leaves := make(map[string]bool) + queue := []*node{s.root} + for { + if len(queue) == 0 { + break + } + cur := queue[0] + queue = queue[1:] + + for _, c := range cur.children { + queue = append([]*node{c}, queue...) + } + + if len(cur.children) == 0 { + leaves[cur.dn()] = true + } + } + + // Phase two: traverse tree from node to root and make note of all subtrees that can be restarted. + // A subtree is restartable/ready iff every node in that subtree is either CANCELED, DEAD or DONE. + // Such a 'ready' subtree can be restarted by the supervisor if needed. + + // DNs that we already visited. + visited := make(map[string]bool) + // DNs whose subtrees are ready to be restarted. + // These are all subtrees recursively - ie., root.a.a and root.a will both be marked here. + ready := make(map[string]bool) + + // We build a queue of nodes to visit, starting from the leaves. + queue = []*node{} + for l, _ := range leaves { + queue = append(queue, s.nodeByDN(l)) + } + + for { + if len(queue) == 0 { + break + } + + cur := queue[0] + curDn := cur.dn() + + queue = queue[1:] + + // Do we have a decision about our children? + allVisited := true + for _, c := range cur.children { + if !visited[c.dn()] { + allVisited = false + break + } + } + + // If no decision about children is available, it means we ended up in this subtree through some shorter path + // of a shorter/lower-order leaf. There is a path to a leaf that's longer than the one that caused this node + // to be enqueued. Easy solution: just push back the current element and retry later. + if !allVisited { + // Push back to queue and wait for a decision later. + queue = append(queue, cur) + continue + } + + // All children have been visited and we have an idea about whether they're ready/restartable. All of the node's + // children must be restartable in order for this node to be restartable. + childrenReady := true + for _, c := range cur.children { + if !ready[c.dn()] { + childrenReady = false + break + } + } + + // In addition to children, the node itself must be restartable (ie. DONE, DEAD or CANCELED). + curReady := false + switch cur.state { + case nodeStateDone: + curReady = true + case nodeStateCanceled: + curReady = true + case nodeStateDead: + curReady = true + } + + // Note down that we have an opinion on this node, and note that opinion down. + visited[curDn] = true + ready[curDn] = childrenReady && curReady + + // Now we can also enqueue the parent of this node for processing. + if cur.parent != nil && !visited[cur.parent.dn()] { + queue = append(queue, cur.parent) + } + } + + // Phase 3: traverse tree from root to find largest subtrees that need to be restarted and are ready to be + // restarted. + + // All DNs that need to be restarted by the GC process. + want := make(map[string]bool) + // All DNs that need to be restarted and can be restarted by the GC process - a subset of 'want' DNs. + can := make(map[string]bool) + // The set difference between 'want' and 'can' are all nodes that should be restarted but can't yet (ie. because + // a child is still in the process of being canceled). + + // DFS from root. + queue = []*node{s.root} + for { + if len(queue) == 0 { + break + } + + cur := queue[0] + queue = queue[1:] + + // If this node is DEAD or CANCELED it should be restarted. + if cur.state == nodeStateDead || cur.state == nodeStateCanceled { + want[cur.dn()] = true + } + + // If it should be restarted and is ready to be restarted... + if want[cur.dn()] && ready[cur.dn()] { + // And its parent context is valid (ie hasn't been canceled), mark it as restartable. + if cur.parent == nil || cur.parent.ctx.Err() == nil { + can[cur.dn()] = true + continue + } + } + + // Otherwise, traverse further down the tree to see if something else needs to be done. + for _, c := range cur.children { + queue = append(queue, c) + } + } + + // Reinitialize and reschedule all subtrees + for dn, _ := range can { + n := s.nodeByDN(dn) + + // Only back off when the node unexpectedly died - not when it got canceled. + bo := time.Duration(0) + if n.state == nodeStateDead { + bo = n.bo.NextBackOff() + } + + // Prepare node for rescheduling - remove its children, reset its state to new. + n.reset() + s.ilogger.Info("rescheduling supervised node", zap.String("dn", dn), zap.Duration("backoff", bo)) + + // Reschedule node runnable to run after backoff. + go func(n *node, bo time.Duration) { + time.Sleep(bo) + s.pReq <- &processorRequest{ + schedule: &processorRequestSchedule{dn: n.dn()}, + } + }(n, bo) + } +} diff --git a/bridge/pkg/supervisor/supervisor_support.go b/bridge/pkg/supervisor/supervisor_support.go new file mode 100644 index 000000000..7421071f5 --- /dev/null +++ b/bridge/pkg/supervisor/supervisor_support.go @@ -0,0 +1,46 @@ +package supervisor + +// Supporting infrastructure to allow running some non-Go payloads under supervision. + +import ( + "context" + "net" + "os/exec" + + "google.golang.org/grpc" +) + +// GRPCServer creates a Runnable that serves gRPC requests as longs as it's not canceled. +// If graceful is set to true, the server will be gracefully stopped instead of plain stopped. This means all pending +// RPCs will finish, but also requires streaming gRPC handlers to check their context liveliness and exit accordingly. +// If the server code does not support this, `graceful` should be false and the server will be killed violently instead. +func GRPCServer(srv *grpc.Server, lis net.Listener, graceful bool) Runnable { + return func(ctx context.Context) error { + Signal(ctx, SignalHealthy) + errC := make(chan error) + go func() { + errC <- srv.Serve(lis) + }() + select { + case <-ctx.Done(): + if graceful { + srv.GracefulStop() + } else { + srv.Stop() + } + return ctx.Err() + case err := <-errC: + return err + } + } +} + +// Command will create a Runnable that starts a long-running command, whose exit is determined to be a failure. +func Command(name string, arg ...string) Runnable { + return func(ctx context.Context) error { + Signal(ctx, SignalHealthy) + + cmd := exec.CommandContext(ctx, name, arg...) + return cmd.Run() + } +} diff --git a/bridge/pkg/supervisor/supervisor_test.go b/bridge/pkg/supervisor/supervisor_test.go new file mode 100644 index 000000000..ec6ab44af --- /dev/null +++ b/bridge/pkg/supervisor/supervisor_test.go @@ -0,0 +1,553 @@ +package supervisor + +import ( + "context" + "fmt" + "testing" + "time" + + "go.uber.org/zap" +) + +func runnableBecomesHealthy(healthy, done chan struct{}) Runnable { + return func(ctx context.Context) error { + Signal(ctx, SignalHealthy) + + go func() { + if healthy != nil { + healthy <- struct{}{} + } + }() + + <-ctx.Done() + + go func() { + if done != nil { + done <- struct{}{} + } + }() + + return ctx.Err() + } +} + +func runnableSpawnsMore(healthy, done chan struct{}, levels int) Runnable { + return func(ctx context.Context) error { + if levels > 0 { + err := RunGroup(ctx, map[string]Runnable{ + "a": runnableSpawnsMore(nil, nil, levels-1), + "b": runnableSpawnsMore(nil, nil, levels-1), + }) + if err != nil { + return err + } + } + + Signal(ctx, SignalHealthy) + + go func() { + if healthy != nil { + healthy <- struct{}{} + } + }() + + <-ctx.Done() + + go func() { + if done != nil { + done <- struct{}{} + } + }() + return ctx.Err() + } +} + +// rc is a Remote Controlled runnable. It is a generic runnable used for testing the supervisor. +type rc struct { + req chan rcRunnableRequest +} + +type rcRunnableRequest struct { + cmd rcRunnableCommand + stateC chan rcRunnableState +} + +type rcRunnableCommand int + +const ( + rcRunnableCommandBecomeHealthy rcRunnableCommand = iota + rcRunnableCommandBecomeDone + rcRunnableCommandDie + rcRunnableCommandPanic + rcRunnableCommandState +) + +type rcRunnableState int + +const ( + rcRunnableStateNew rcRunnableState = iota + rcRunnableStateHealthy + rcRunnableStateDone +) + +func (r *rc) becomeHealthy() { + r.req <- rcRunnableRequest{cmd: rcRunnableCommandBecomeHealthy} +} + +func (r *rc) becomeDone() { + r.req <- rcRunnableRequest{cmd: rcRunnableCommandBecomeDone} +} +func (r *rc) die() { + r.req <- rcRunnableRequest{cmd: rcRunnableCommandDie} +} + +func (r *rc) panic() { + r.req <- rcRunnableRequest{cmd: rcRunnableCommandPanic} +} + +func (r *rc) state() rcRunnableState { + c := make(chan rcRunnableState) + r.req <- rcRunnableRequest{ + cmd: rcRunnableCommandState, + stateC: c, + } + return <-c +} + +func (r *rc) waitState(s rcRunnableState) { + // This is poll based. Making it non-poll based would make the RC runnable logic a bit more complex for little gain. + for { + got := r.state() + if got == s { + return + } + time.Sleep(10 * time.Millisecond) + } +} + +func newRC() *rc { + return &rc{ + req: make(chan rcRunnableRequest), + } +} + +// Remote Controlled Runnable +func (r *rc) runnable() Runnable { + return func(ctx context.Context) error { + state := rcRunnableStateNew + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case r := <-r.req: + switch r.cmd { + case rcRunnableCommandBecomeHealthy: + Signal(ctx, SignalHealthy) + state = rcRunnableStateHealthy + case rcRunnableCommandBecomeDone: + Signal(ctx, SignalDone) + state = rcRunnableStateDone + case rcRunnableCommandDie: + return fmt.Errorf("died on request") + case rcRunnableCommandPanic: + panic("at the disco") + case rcRunnableCommandState: + r.stateC <- state + } + } + } + } +} + +func TestSimple(t *testing.T) { + h1 := make(chan struct{}) + d1 := make(chan struct{}) + h2 := make(chan struct{}) + d2 := make(chan struct{}) + + log, _ := zap.NewDevelopment() + ctx, ctxC := context.WithCancel(context.Background()) + defer ctxC() + s := New(ctx, log, func(ctx context.Context) error { + err := RunGroup(ctx, map[string]Runnable{ + "one": runnableBecomesHealthy(h1, d1), + "two": runnableBecomesHealthy(h2, d2), + }) + if err != nil { + return err + } + Signal(ctx, SignalHealthy) + Signal(ctx, SignalDone) + return nil + }, WithPropagatePanic) + + // Expect both to start running. + s.waitSettleError(ctx, t) + select { + case <-h1: + default: + t.Fatalf("runnable 'one' didn't start") + } + select { + case <-h2: + default: + t.Fatalf("runnable 'one' didn't start") + } +} + +func TestSimpleFailure(t *testing.T) { + h1 := make(chan struct{}) + d1 := make(chan struct{}) + two := newRC() + + log, _ := zap.NewDevelopment() + ctx, ctxC := context.WithTimeout(context.Background(), 10*time.Second) + defer ctxC() + s := New(ctx, log, func(ctx context.Context) error { + err := RunGroup(ctx, map[string]Runnable{ + "one": runnableBecomesHealthy(h1, d1), + "two": two.runnable(), + }) + if err != nil { + return err + } + Signal(ctx, SignalHealthy) + Signal(ctx, SignalDone) + return nil + }, WithPropagatePanic) + s.waitSettleError(ctx, t) + + two.becomeHealthy() + s.waitSettleError(ctx, t) + // Expect one to start running. + select { + case <-h1: + default: + t.Fatalf("runnable 'one' didn't start") + } + + // Kill off two, one should restart. + two.die() + s.waitSettleError(ctx, t) + select { + case <-d1: + default: + t.Fatalf("runnable 'one' didn't acknowledge cancel") + } + + // And one should start running again. + s.waitSettleError(ctx, t) + select { + case <-h1: + default: + t.Fatalf("runnable 'one' didn't restart") + } +} + +func TestDeepFailure(t *testing.T) { + h1 := make(chan struct{}) + d1 := make(chan struct{}) + two := newRC() + + log, _ := zap.NewDevelopment() + + ctx, ctxC := context.WithTimeout(context.Background(), 10*time.Second) + defer ctxC() + s := New(ctx, log, func(ctx context.Context) error { + err := RunGroup(ctx, map[string]Runnable{ + "one": runnableSpawnsMore(h1, d1, 5), + "two": two.runnable(), + }) + if err != nil { + return err + } + Signal(ctx, SignalHealthy) + Signal(ctx, SignalDone) + return nil + }, WithPropagatePanic) + + two.becomeHealthy() + s.waitSettleError(ctx, t) + // Expect one to start running. + select { + case <-h1: + default: + t.Fatalf("runnable 'one' didn't start") + } + + // Kill off two, one should restart. + two.die() + s.waitSettleError(ctx, t) + select { + case <-d1: + default: + t.Fatalf("runnable 'one' didn't acknowledge cancel") + } + + // And one should start running again. + s.waitSettleError(ctx, t) + select { + case <-h1: + default: + t.Fatalf("runnable 'one' didn't restart") + } +} + +func TestPanic(t *testing.T) { + h1 := make(chan struct{}) + d1 := make(chan struct{}) + two := newRC() + + log, _ := zap.NewDevelopment() + ctx, ctxC := context.WithCancel(context.Background()) + defer ctxC() + s := New(ctx, log, func(ctx context.Context) error { + err := RunGroup(ctx, map[string]Runnable{ + "one": runnableBecomesHealthy(h1, d1), + "two": two.runnable(), + }) + if err != nil { + return err + } + Signal(ctx, SignalHealthy) + Signal(ctx, SignalDone) + return nil + }) + + two.becomeHealthy() + s.waitSettleError(ctx, t) + // Expect one to start running. + select { + case <-h1: + default: + t.Fatalf("runnable 'one' didn't start") + } + + // Kill off two, one should restart. + two.panic() + s.waitSettleError(ctx, t) + select { + case <-d1: + default: + t.Fatalf("runnable 'one' didn't acknowledge cancel") + } + + // And one should start running again. + s.waitSettleError(ctx, t) + select { + case <-h1: + default: + t.Fatalf("runnable 'one' didn't restart") + } +} + +func TestMultipleLevelFailure(t *testing.T) { + log, _ := zap.NewDevelopment() + ctx, ctxC := context.WithCancel(context.Background()) + defer ctxC() + New(ctx, log, func(ctx context.Context) error { + err := RunGroup(ctx, map[string]Runnable{ + "one": runnableSpawnsMore(nil, nil, 4), + "two": runnableSpawnsMore(nil, nil, 4), + }) + if err != nil { + return err + } + Signal(ctx, SignalHealthy) + Signal(ctx, SignalDone) + return nil + }, WithPropagatePanic) +} + +func TestBackoff(t *testing.T) { + one := newRC() + + log, _ := zap.NewDevelopment() + ctx, ctxC := context.WithTimeout(context.Background(), 20*time.Second) + defer ctxC() + + s := New(ctx, log, func(ctx context.Context) error { + if err := Run(ctx, "one", one.runnable()); err != nil { + return err + } + Signal(ctx, SignalHealthy) + Signal(ctx, SignalDone) + return nil + }, WithPropagatePanic) + + one.becomeHealthy() + // Die a bunch of times in a row, this brings up the next exponential backoff to over a second. + for i := 0; i < 4; i += 1 { + one.die() + one.waitState(rcRunnableStateNew) + } + // Measure how long it takes for the runnable to respawn after a number of failures + start := time.Now() + one.die() + one.becomeHealthy() + one.waitState(rcRunnableStateHealthy) + taken := time.Since(start) + if taken < 1*time.Second { + t.Errorf("Runnable took %v to restart, wanted at least a second from backoff", taken) + } + + s.waitSettleError(ctx, t) + // Now that we've become healthy, die again. Becoming healthy resets the backoff. + start = time.Now() + one.die() + one.becomeHealthy() + one.waitState(rcRunnableStateHealthy) + taken = time.Since(start) + if taken > 1*time.Second || taken < 100*time.Millisecond { + t.Errorf("Runnable took %v to restart, wanted at least 100ms from backoff and at most 1s from backoff reset", taken) + } +} + +// TestResilience throws some curveballs at the supervisor - either programming errors or high load. It then ensures +// that another runnable is running, and that it restarts on its sibling failure. +func TestResilience(t *testing.T) { + // request/response channel for testing liveness of the 'one' runnable + req := make(chan chan struct{}) + + // A runnable that responds on the 'req' channel. + one := func(ctx context.Context) error { + Signal(ctx, SignalHealthy) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case r := <-req: + r <- struct{}{} + } + } + } + oneSibling := newRC() + + oneTest := func() { + timeout := time.NewTicker(1000 * time.Millisecond) + ping := make(chan struct{}) + req <- ping + select { + case <-ping: + case <-timeout.C: + t.Fatalf("one ping response timeout") + } + timeout.Stop() + } + + // A nasty runnable that calls Signal with the wrong context (this is a programming error) + two := func(ctx context.Context) error { + Signal(context.TODO(), SignalHealthy) + return nil + } + + // A nasty runnable that calls Signal wrong (this is a programming error). + three := func(ctx context.Context) error { + Signal(ctx, SignalDone) + return nil + } + + // A nasty runnable that runs in a busy loop (this is a programming error). + four := func(ctx context.Context) error { + for { + time.Sleep(0) + } + } + + // A nasty runnable that keeps creating more runnables. + five := func(ctx context.Context) error { + i := 1 + for { + err := Run(ctx, fmt.Sprintf("r%d", i), runnableSpawnsMore(nil, nil, 2)) + if err != nil { + return err + } + + time.Sleep(100 * time.Millisecond) + i += 1 + } + } + + log, _ := zap.NewDevelopment() + ctx, ctxC := context.WithCancel(context.Background()) + defer ctxC() + New(ctx, log, func(ctx context.Context) error { + RunGroup(ctx, map[string]Runnable{ + "one": one, + "oneSibling": oneSibling.runnable(), + }) + rs := map[string]Runnable{ + "two": two, "three": three, "four": four, "five": five, + } + for k, v := range rs { + if err := Run(ctx, k, v); err != nil { + return err + } + } + Signal(ctx, SignalHealthy) + Signal(ctx, SignalDone) + return nil + }) + + // Five rounds of letting one run, then restarting it. + for i := 0; i < 5; i += 1 { + oneSibling.becomeHealthy() + oneSibling.waitState(rcRunnableStateHealthy) + + // 'one' should work for at least a second. + deadline := time.Now().Add(1 * time.Second) + for { + if time.Now().After(deadline) { + break + } + + oneTest() + } + + // Killing 'oneSibling' should restart one. + oneSibling.panic() + } + // Make sure 'one' is still okay. + oneTest() +} + +func ExampleNew() { + // Minimal runnable that is immediately done. + childC := make(chan struct{}) + child := func(ctx context.Context) error { + Signal(ctx, SignalHealthy) + close(childC) + Signal(ctx, SignalDone) + return nil + } + + log, _ := zap.NewDevelopment() + + // Start a supervision tree with a root runnable. + ctx, ctxC := context.WithCancel(context.Background()) + defer ctxC() + New(ctx, log, func(ctx context.Context) error { + err := Run(ctx, "child", child) + if err != nil { + return fmt.Errorf("could not run 'child': %w", err) + } + Signal(ctx, SignalHealthy) + + t := time.NewTicker(time.Second) + defer t.Stop() + + // Do something in the background, and exit on context cancel. + for { + select { + case <-t.C: + fmt.Printf("tick!") + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + // root.child will close this channel. + <-childC +} diff --git a/bridge/pkg/supervisor/supervisor_testhelpers.go b/bridge/pkg/supervisor/supervisor_testhelpers.go new file mode 100644 index 000000000..30db7832e --- /dev/null +++ b/bridge/pkg/supervisor/supervisor_testhelpers.go @@ -0,0 +1,34 @@ +package supervisor + +import ( + "context" + "testing" +) + +// waitSettle waits until the supervisor reaches a 'settled' state - ie., one +// where no actions have been performed for a number of GC cycles. +// This is used in tests only. +func (s *supervisor) waitSettle(ctx context.Context) error { + waiter := make(chan struct{}) + s.pReq <- &processorRequest{ + waitSettled: &processorRequestWaitSettled{ + waiter: waiter, + }, + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-waiter: + return nil + } +} + +// waitSettleError wraps waitSettle to fail a test if an error occurs, eg. the +// context is canceled. +func (s *supervisor) waitSettleError(ctx context.Context, t *testing.T) { + err := s.waitSettle(ctx) + if err != nil { + t.Fatalf("waitSettle: %v", err) + } +}