node/node: Make processor optional

This commit is contained in:
tbjump 2023-06-08 20:51:07 +00:00 committed by tbjump
parent 6d86ee1b5c
commit 4274115cc3
3 changed files with 33 additions and 24 deletions

View File

@ -1372,6 +1372,7 @@ func runNode(cmd *cobra.Command, args []string) {
node.GuardianOptionAdminService(*adminSocketPath, ethRPC, ethContract, rpcMap), node.GuardianOptionAdminService(*adminSocketPath, ethRPC, ethContract, rpcMap),
node.GuardianOptionP2P(p2pKey, *p2pNetworkID, *p2pBootstrap, *nodeName, *disableHeartbeatVerify, *p2pPort, ibc.GetFeatures), node.GuardianOptionP2P(p2pKey, *p2pNetworkID, *p2pBootstrap, *nodeName, *disableHeartbeatVerify, *p2pPort, ibc.GetFeatures),
node.GuardianOptionStatusServer(*statusAddr), node.GuardianOptionStatusServer(*statusAddr),
node.GuardianOptionProcessor(),
} }
if shouldStart(publicGRPCSocketPath) { if shouldStart(publicGRPCSocketPath) {

View File

@ -406,6 +406,35 @@ func GuardianOptionDatabase(db *db.Database) *GuardianOption {
}} }}
} }
func GuardianOptionProcessor() *GuardianOption {
return &GuardianOption{
name: "processor",
// governor and accountant may be set to nil, but that choice needs to be made before the processor is configured
dependencies: []string{"db", "governor", "accountant"},
f: func(ctx context.Context, logger *zap.Logger, g *G) error {
g.runnables["processor"] = processor.NewProcessor(ctx,
g.db,
g.msgC.readC,
g.setC.readC,
g.gossipSendC,
g.obsvC,
g.obsvReqSendC.writeC,
g.injectC.readC,
g.signedInC.readC,
g.gk,
g.gst,
g.attestationEvents,
g.gov,
g.acct,
g.acctC.readC,
).Run
return nil
}}
}
type G struct { type G struct {
// rootCtxCancel is a context.CancelFunc. It MUST be a root context for any context that is passed to any member function of G. // rootCtxCancel is a context.CancelFunc. It MUST be a root context for any context that is passed to any member function of G.
// It can be used by components to shut down the entire node if they encounter an unrecoverable state. // It can be used by components to shut down the entire node if they encounter an unrecoverable state.
@ -537,6 +566,8 @@ func (g *G) Run(rootCtxCancel context.CancelFunc, options ...*GuardianOption) su
} }
} }
// TODO there is an opportunity to refactor the startup of the accountant and governor:
// Ideally they should just register a g.runnables["governor"] and g.runnables["accountant"] instead of being treated as special cases.
if g.acct != nil { if g.acct != nil {
logger.Info("Starting accountant") logger.Info("Starting accountant")
if err := g.acct.Start(ctx); err != nil { if err := g.acct.Start(ctx); err != nil {
@ -551,30 +582,6 @@ func (g *G) Run(rootCtxCancel context.CancelFunc, options ...*GuardianOption) su
} }
} }
if g.db == nil {
logger.Fatal("no database configured, cannot start guardian.")
}
logger.Info("Starting processor")
if err := supervisor.Run(ctx, "processor", processor.NewProcessor(ctx,
g.db,
g.msgC.readC,
g.setC.readC,
g.gossipSendC,
g.obsvC,
g.obsvReqSendC.writeC,
g.injectC.readC,
g.signedInC.readC,
g.gk,
g.gst,
g.attestationEvents,
g.gov,
g.acct,
g.acctC.readC,
).Run); err != nil {
logger.Fatal("failed to start processor", zap.Error(err))
}
// Start any other runnables // Start any other runnables
for name, runnable := range g.runnables { for name, runnable := range g.runnables {
if err := supervisor.Run(ctx, name, runnable); err != nil { if err := supervisor.Run(ctx, name, runnable); err != nil {

View File

@ -162,6 +162,7 @@ func mockGuardianRunnable(gs []*mockGuardian, mockGuardianIndex uint, obsDb mock
GuardianOptionPublicrpcTcpService(publicRpc, common.GrpcLogDetailFull), GuardianOptionPublicrpcTcpService(publicRpc, common.GrpcLogDetailFull),
GuardianOptionAdminService(adminSocketPath, nil, nil, rpcMap), GuardianOptionAdminService(adminSocketPath, nil, nil, rpcMap),
GuardianOptionStatusServer(fmt.Sprintf("[::]:%d", mockStatusPort(mockGuardianIndex))), GuardianOptionStatusServer(fmt.Sprintf("[::]:%d", mockStatusPort(mockGuardianIndex))),
GuardianOptionProcessor(),
} }
guardianNode := NewGuardianNode( guardianNode := NewGuardianNode(