diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index f1b88e26c..78b726090 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -1372,6 +1372,7 @@ func runNode(cmd *cobra.Command, args []string) { node.GuardianOptionAdminService(*adminSocketPath, ethRPC, ethContract, rpcMap), node.GuardianOptionP2P(p2pKey, *p2pNetworkID, *p2pBootstrap, *nodeName, *disableHeartbeatVerify, *p2pPort, ibc.GetFeatures), node.GuardianOptionStatusServer(*statusAddr), + node.GuardianOptionProcessor(), } if shouldStart(publicGRPCSocketPath) { diff --git a/node/pkg/node/node.go b/node/pkg/node/node.go index 1ae4283d0..234fa9c1b 100644 --- a/node/pkg/node/node.go +++ b/node/pkg/node/node.go @@ -406,6 +406,35 @@ func GuardianOptionDatabase(db *db.Database) *GuardianOption { }} } +func GuardianOptionProcessor() *GuardianOption { + return &GuardianOption{ + name: "processor", + // governor and accountant may be set to nil, but that choice needs to be made before the processor is configured + dependencies: []string{"db", "governor", "accountant"}, + + f: func(ctx context.Context, logger *zap.Logger, g *G) error { + + g.runnables["processor"] = processor.NewProcessor(ctx, + g.db, + g.msgC.readC, + g.setC.readC, + g.gossipSendC, + g.obsvC, + g.obsvReqSendC.writeC, + g.injectC.readC, + g.signedInC.readC, + g.gk, + g.gst, + g.attestationEvents, + g.gov, + g.acct, + g.acctC.readC, + ).Run + + return nil + }} +} + type G struct { // rootCtxCancel is a context.CancelFunc. It MUST be a root context for any context that is passed to any member function of G. // It can be used by components to shut down the entire node if they encounter an unrecoverable state. @@ -537,6 +566,8 @@ func (g *G) Run(rootCtxCancel context.CancelFunc, options ...*GuardianOption) su } } + // TODO there is an opportunity to refactor the startup of the accountant and governor: + // Ideally they should just register a g.runnables["governor"] and g.runnables["accountant"] instead of being treated as special cases. if g.acct != nil { logger.Info("Starting accountant") if err := g.acct.Start(ctx); err != nil { @@ -551,30 +582,6 @@ func (g *G) Run(rootCtxCancel context.CancelFunc, options ...*GuardianOption) su } } - if g.db == nil { - logger.Fatal("no database configured, cannot start guardian.") - } - - logger.Info("Starting processor") - if err := supervisor.Run(ctx, "processor", processor.NewProcessor(ctx, - g.db, - g.msgC.readC, - g.setC.readC, - g.gossipSendC, - g.obsvC, - g.obsvReqSendC.writeC, - g.injectC.readC, - g.signedInC.readC, - g.gk, - g.gst, - g.attestationEvents, - g.gov, - g.acct, - g.acctC.readC, - ).Run); err != nil { - logger.Fatal("failed to start processor", zap.Error(err)) - } - // Start any other runnables for name, runnable := range g.runnables { if err := supervisor.Run(ctx, name, runnable); err != nil { diff --git a/node/pkg/node/node_test.go b/node/pkg/node/node_test.go index 8ac7f7966..61cb5b2e1 100644 --- a/node/pkg/node/node_test.go +++ b/node/pkg/node/node_test.go @@ -162,6 +162,7 @@ func mockGuardianRunnable(gs []*mockGuardian, mockGuardianIndex uint, obsDb mock GuardianOptionPublicrpcTcpService(publicRpc, common.GrpcLogDetailFull), GuardianOptionAdminService(adminSocketPath, nil, nil, rpcMap), GuardianOptionStatusServer(fmt.Sprintf("[::]:%d", mockStatusPort(mockGuardianIndex))), + GuardianOptionProcessor(), } guardianNode := NewGuardianNode(