diff --git a/node/pkg/node/node_test.go b/node/pkg/node/node_test.go index bbd727b38..d318fffde 100644 --- a/node/pkg/node/node_test.go +++ b/node/pkg/node/node_test.go @@ -76,9 +76,30 @@ type mockGuardian struct { gk *ecdsa.PrivateKey guardianAddr eth_common.Address ready bool + config *guardianConfig } -func newMockGuardianSet(n int) []*mockGuardian { +type guardianConfig struct { + publicSocket string + adminSocket string + publicRpc string + publicWeb string + statusPort uint + p2pPort uint +} + +func createGuardianConfig(testId uint, mockGuardianIndex uint) *guardianConfig { + return &guardianConfig{ + publicSocket: fmt.Sprintf("/tmp/test_guardian_%d_public.socket", mockGuardianIndex+testId*20), + adminSocket: fmt.Sprintf("/tmp/test_guardian_%d_admin.socket", mockGuardianIndex+testId*20), // TODO consider using os.CreateTemp("/tmp", "test_guardian_adminXXXXX.socket"), + publicRpc: fmt.Sprintf("127.0.0.1:%d", mockGuardianIndex+LOCAL_RPC_PORTRANGE_START+testId*20), + publicWeb: fmt.Sprintf("127.0.0.1:%d", mockGuardianIndex+LOCAL_PUBLICWEB_PORTRANGE_START+testId*20), + statusPort: mockGuardianIndex + LOCAL_STATUS_PORTRANGE_START + testId*20, + p2pPort: mockGuardianIndex + LOCAL_P2P_PORTRANGE_START + testId*20, + } +} + +func newMockGuardianSet(testId uint, n int) []*mockGuardian { gs := make([]*mockGuardian, n) for i := 0; i < n; i++ { @@ -94,6 +115,7 @@ func newMockGuardianSet(n int) []*mockGuardian { MockSetC: make(chan *common.GuardianSet), gk: gk, guardianAddr: ethcrypto.PubkeyToAddress(gk.PublicKey), + config: createGuardianConfig(testId, uint(i)), } } @@ -108,32 +130,8 @@ func mockGuardianSetToGuardianAddrList(gs []*mockGuardian) []eth_common.Address return result } -func mockPublicSocket(testId uint, mockGuardianIndex uint) string { - return fmt.Sprintf("/tmp/test_guardian_%d_public.socket", mockGuardianIndex+testId*20) -} - -func mockAdminStocket(testId uint, mockGuardianIndex uint) string { - return fmt.Sprintf("/tmp/test_guardian_%d_admin.socket", mockGuardianIndex+testId*20) -} - -func mockPublicRpc(testId uint, mockGuardianIndex uint) string { - return fmt.Sprintf("127.0.0.1:%d", mockGuardianIndex+LOCAL_RPC_PORTRANGE_START+testId*20) -} - -func mockPublicWeb(testId uint, mockGuardianIndex uint) string { - return fmt.Sprintf("127.0.0.1:%d", mockGuardianIndex+LOCAL_PUBLICWEB_PORTRANGE_START+testId*20) -} - -func mockStatusPort(testId uint, mockGuardianIndex uint) uint { - return mockGuardianIndex + LOCAL_STATUS_PORTRANGE_START + testId*20 -} - -func mockP2PPort(testId uint, mockGuardianIndex uint) uint { - return mockGuardianIndex + LOCAL_P2P_PORTRANGE_START + testId*20 -} - // mockGuardianRunnable returns a runnable that first sets up a mock guardian an then runs it. -func mockGuardianRunnable(testId uint, gs []*mockGuardian, mockGuardianIndex uint, obsDb mock.ObservationDb) supervisor.Runnable { +func mockGuardianRunnable(gs []*mockGuardian, mockGuardianIndex uint, obsDb mock.ObservationDb) supervisor.Runnable { return func(ctx context.Context) error { // Create a sub-context with cancel function that we can pass to G.run. ctx, ctxCancel := context.WithCancel(ctx) @@ -165,32 +163,28 @@ func mockGuardianRunnable(testId uint, gs []*mockGuardian, mockGuardianIndex uin if err != nil { return err } - bootstrapPeers := fmt.Sprintf("/ip4/127.0.0.1/udp/%d/quic/p2p/%s", mockP2PPort(testId, 0), zeroPeerId.String()) - p2pPort := mockP2PPort(testId, mockGuardianIndex) - - // configure publicRpc - publicSocketPath := mockPublicSocket(testId, mockGuardianIndex) - publicRpc := mockPublicRpc(testId, mockGuardianIndex) + bootstrapPeers := fmt.Sprintf("/ip4/127.0.0.1/udp/%d/quic/p2p/%s", gs[0].config.p2pPort, zeroPeerId.String()) // configure adminservice - adminSocketPath := mockAdminStocket(testId, mockGuardianIndex) rpcMap := make(map[string]string) // We set this to None because we don't want to count these logs when counting the amount of logs generated per message publicRpcLogDetail := common.GrpcLogDetailNone + cfg := gs[mockGuardianIndex].config + // assemble all the options guardianOptions := []*GuardianOption{ GuardianOptionDatabase(db), GuardianOptionWatchers(watcherConfigs, nil), GuardianOptionNoAccountant(), // disable accountant GuardianOptionGovernor(true), - GuardianOptionP2P(gs[mockGuardianIndex].p2pKey, networkID, bootstrapPeers, nodeName, false, p2pPort, func() string { return "" }), - GuardianOptionPublicRpcSocket(publicSocketPath, publicRpcLogDetail), - GuardianOptionPublicrpcTcpService(publicRpc, publicRpcLogDetail), - GuardianOptionPublicWeb(mockPublicWeb(testId, mockGuardianIndex), publicSocketPath, "", false, ""), - GuardianOptionAdminService(adminSocketPath, nil, nil, rpcMap), - GuardianOptionStatusServer(fmt.Sprintf("[::]:%d", mockStatusPort(testId, mockGuardianIndex))), + GuardianOptionP2P(gs[mockGuardianIndex].p2pKey, networkID, bootstrapPeers, nodeName, false, cfg.p2pPort, func() string { return "" }), + GuardianOptionPublicRpcSocket(cfg.publicSocket, publicRpcLogDetail), + GuardianOptionPublicrpcTcpService(cfg.publicRpc, publicRpcLogDetail), + GuardianOptionPublicWeb(cfg.publicWeb, cfg.publicSocket, "", false, ""), + GuardianOptionAdminService(cfg.adminSocket, nil, nil, rpcMap), + GuardianOptionStatusServer(fmt.Sprintf("[::]:%d", cfg.statusPort)), GuardianOptionProcessor(), } @@ -253,14 +247,14 @@ func waitForHeartbeatsInLogs(t testing.TB, zapObserver *observer.ObservedLogs, g // WARNING: Currently, there is only a global registry for all prometheus metrics, leading to all guardian nodes writing to the same one. // // As long as this is the case, you probably don't want to use this function. -func waitForPromMetricGte(t testing.TB, testId uint, ctx context.Context, gs []*mockGuardian, metric string, min int) { +func waitForPromMetricGte(t testing.TB, ctx context.Context, gs []*mockGuardian, metric string, min int) { metricBytes := []byte(metric) requests := make([]*http.Request, len(gs)) readyFlags := make([]bool, len(gs)) // create the prom api clients for i := range gs { - url := fmt.Sprintf("http://localhost:%d/metrics", mockStatusPort(testId, uint(i))) + url := fmt.Sprintf("http://localhost:%d/metrics", gs[i].config.statusPort) req, err := http.NewRequestWithContext(ctx, "GET", url, nil) assert.NoError(t, err) requests[i] = req @@ -582,13 +576,13 @@ func testConsensus(t *testing.T, testCases []testCase, numGuardians int) { logger := supervisor.Logger(ctx) // create the Guardian Set - gs := newMockGuardianSet(numGuardians) + gs := newMockGuardianSet(testId, numGuardians) obsDb := makeObsDb(testCases) // run the guardians for i := 0; i < numGuardians; i++ { - gRun := mockGuardianRunnable(testId, gs, uint(i), obsDb) + gRun := mockGuardianRunnable(gs, uint(i), obsDb) err := supervisor.Run(ctx, fmt.Sprintf("g-%d", i), gRun) if i == 0 && numGuardians > 1 { time.Sleep(time.Second) // give the bootstrap guardian some time to start up @@ -609,8 +603,8 @@ func testConsensus(t *testing.T, testCases []testCase, numGuardians int) { } // wait for the status server to come online and check that it works - for i := range gs { - err := testStatusServer(ctx, logger, fmt.Sprintf("http://127.0.0.1:%d/metrics", mockStatusPort(testId, uint(i)))) + for _, g := range gs { + err := testStatusServer(ctx, logger, fmt.Sprintf("http://127.0.0.1:%d/metrics", g.config.statusPort)) assert.NoError(t, err) } @@ -619,7 +613,7 @@ func testConsensus(t *testing.T, testCases []testCase, numGuardians int) { assert.True(t, WAIT_FOR_LOGS || WAIT_FOR_METRICS) assert.False(t, WAIT_FOR_LOGS && WAIT_FOR_METRICS) // can't do both, because they both write to gs[].ready if WAIT_FOR_METRICS { - waitForPromMetricGte(t, testId, ctx, gs, PROMETHEUS_METRIC_VALID_HEARTBEAT_RECEIVED, 1) + waitForPromMetricGte(t, ctx, gs, PROMETHEUS_METRIC_VALID_HEARTBEAT_RECEIVED, 1) } if WAIT_FOR_LOGS { waitForHeartbeatsInLogs(t, zapObserver, gs) @@ -645,14 +639,14 @@ func testConsensus(t *testing.T, testCases []testCase, numGuardians int) { } // Wait for adminrpc to come online - for zapObserver.FilterMessage("admin server listening on").FilterField(zap.String("path", mockAdminStocket(testId, adminRpcGuardianIndex))).Len() == 0 { + for zapObserver.FilterMessage("admin server listening on").FilterField(zap.String("path", gs[adminRpcGuardianIndex].config.adminSocket)).Len() == 0 { logger.Info("admin server seems to be offline (according to logs). Waiting 100ms...") time.Sleep(time.Microsecond * 100) } // Send manual re-observation requests func() { // put this in own function to use defer - s := fmt.Sprintf("unix:///%s", mockAdminStocket(testId, vaaCheckGuardianIndex)) + s := fmt.Sprintf("unix:///%s", gs[adminRpcGuardianIndex].config.adminSocket) conn, err := grpc.DialContext(ctx, s, grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) defer conn.Close() @@ -677,14 +671,14 @@ func testConsensus(t *testing.T, testCases []testCase, numGuardians int) { }() // Wait for publicrpc to come online - for zapObserver.FilterMessage("publicrpc server listening").FilterField(zap.String("addr", mockPublicRpc(testId, vaaCheckGuardianIndex))).Len() == 0 { + for zapObserver.FilterMessage("publicrpc server listening").FilterField(zap.String("addr", gs[vaaCheckGuardianIndex].config.publicRpc)).Len() == 0 { logger.Info("publicrpc seems to be offline (according to logs). Waiting 100ms...") time.Sleep(time.Microsecond * 100) } // check that the VAAs were generated logger.Info("Connecting to publicrpc...") - conn, err := grpc.DialContext(ctx, mockPublicRpc(testId, vaaCheckGuardianIndex), grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.DialContext(ctx, gs[vaaCheckGuardianIndex].config.publicRpc, grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) defer conn.Close() @@ -1022,13 +1016,13 @@ func benchmarkConsensus(t *testing.B, name string, numGuardians int, numMessages logger := supervisor.Logger(ctx) // create the Guardian Set - gs := newMockGuardianSet(numGuardians) + gs := newMockGuardianSet(testId, numGuardians) var obsDb mock.ObservationDb = nil // TODO // run the guardians for i := 0; i < numGuardians; i++ { - gRun := mockGuardianRunnable(testId, gs, uint(i), obsDb) + gRun := mockGuardianRunnable(gs, uint(i), obsDb) err := supervisor.Run(ctx, fmt.Sprintf("g-%d", i), gRun) if i == 0 && numGuardians > 1 { time.Sleep(time.Second) // give the bootstrap guardian some time to start up @@ -1049,8 +1043,8 @@ func benchmarkConsensus(t *testing.B, name string, numGuardians int, numMessages } // wait for the status server to come online and check that it works - for i := range gs { - err := testStatusServer(ctx, logger, fmt.Sprintf("http://127.0.0.1:%d/metrics", mockStatusPort(testId, uint(i)))) + for _, g := range gs { + err := testStatusServer(ctx, logger, fmt.Sprintf("http://127.0.0.1:%d/metrics", g.config.statusPort)) assert.NoError(t, err) } @@ -1058,7 +1052,7 @@ func benchmarkConsensus(t *testing.B, name string, numGuardians int, numMessages // This is necessary because if they have not joined the p2p network yet, gossip messages may get dropped silently. assert.True(t, WAIT_FOR_LOGS || WAIT_FOR_METRICS) if WAIT_FOR_METRICS { - waitForPromMetricGte(t, testId, ctx, gs, PROMETHEUS_METRIC_VALID_HEARTBEAT_RECEIVED, 1) + waitForPromMetricGte(t, ctx, gs, PROMETHEUS_METRIC_VALID_HEARTBEAT_RECEIVED, 1) } if WAIT_FOR_LOGS { waitForHeartbeatsInLogs(t, zapObserver, gs) @@ -1066,12 +1060,12 @@ func benchmarkConsensus(t *testing.B, name string, numGuardians int, numMessages logger.Info("All Guardians have received at least one heartbeat.") // Wait for publicrpc to come online. - for zapObserver.FilterMessage("publicrpc server listening").FilterField(zap.String("addr", mockPublicRpc(testId, 0))).Len() == 0 { + for zapObserver.FilterMessage("publicrpc server listening").FilterField(zap.String("addr", gs[0].config.publicRpc)).Len() == 0 { logger.Info("publicrpc seems to be offline (according to logs). Waiting 100ms...") time.Sleep(time.Microsecond * 100) } // now that it's online, connect to publicrpc of guardian-0 - conn, err := grpc.DialContext(ctx, mockPublicRpc(testId, 0), grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.DialContext(ctx, gs[0].config.publicRpc, grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) defer conn.Close() c := publicrpcv1.NewPublicRPCServiceClient(conn)