node/node_test: cleanup guardianConfig

This commit is contained in:
tbjump 2023-07-12 00:07:16 +00:00 committed by tbjump
parent 0896d028bd
commit d6f2e61861
1 changed files with 51 additions and 57 deletions

View File

@ -76,9 +76,30 @@ type mockGuardian struct {
gk *ecdsa.PrivateKey
guardianAddr eth_common.Address
ready bool
config *guardianConfig
}
func newMockGuardianSet(n int) []*mockGuardian {
type guardianConfig struct {
publicSocket string
adminSocket string
publicRpc string
publicWeb string
statusPort uint
p2pPort uint
}
func createGuardianConfig(testId uint, mockGuardianIndex uint) *guardianConfig {
return &guardianConfig{
publicSocket: fmt.Sprintf("/tmp/test_guardian_%d_public.socket", mockGuardianIndex+testId*20),
adminSocket: fmt.Sprintf("/tmp/test_guardian_%d_admin.socket", mockGuardianIndex+testId*20), // TODO consider using os.CreateTemp("/tmp", "test_guardian_adminXXXXX.socket"),
publicRpc: fmt.Sprintf("127.0.0.1:%d", mockGuardianIndex+LOCAL_RPC_PORTRANGE_START+testId*20),
publicWeb: fmt.Sprintf("127.0.0.1:%d", mockGuardianIndex+LOCAL_PUBLICWEB_PORTRANGE_START+testId*20),
statusPort: mockGuardianIndex + LOCAL_STATUS_PORTRANGE_START + testId*20,
p2pPort: mockGuardianIndex + LOCAL_P2P_PORTRANGE_START + testId*20,
}
}
func newMockGuardianSet(testId uint, n int) []*mockGuardian {
gs := make([]*mockGuardian, n)
for i := 0; i < n; i++ {
@ -94,6 +115,7 @@ func newMockGuardianSet(n int) []*mockGuardian {
MockSetC: make(chan *common.GuardianSet),
gk: gk,
guardianAddr: ethcrypto.PubkeyToAddress(gk.PublicKey),
config: createGuardianConfig(testId, uint(i)),
}
}
@ -108,32 +130,8 @@ func mockGuardianSetToGuardianAddrList(gs []*mockGuardian) []eth_common.Address
return result
}
func mockPublicSocket(testId uint, mockGuardianIndex uint) string {
return fmt.Sprintf("/tmp/test_guardian_%d_public.socket", mockGuardianIndex+testId*20)
}
func mockAdminStocket(testId uint, mockGuardianIndex uint) string {
return fmt.Sprintf("/tmp/test_guardian_%d_admin.socket", mockGuardianIndex+testId*20)
}
func mockPublicRpc(testId uint, mockGuardianIndex uint) string {
return fmt.Sprintf("127.0.0.1:%d", mockGuardianIndex+LOCAL_RPC_PORTRANGE_START+testId*20)
}
func mockPublicWeb(testId uint, mockGuardianIndex uint) string {
return fmt.Sprintf("127.0.0.1:%d", mockGuardianIndex+LOCAL_PUBLICWEB_PORTRANGE_START+testId*20)
}
func mockStatusPort(testId uint, mockGuardianIndex uint) uint {
return mockGuardianIndex + LOCAL_STATUS_PORTRANGE_START + testId*20
}
func mockP2PPort(testId uint, mockGuardianIndex uint) uint {
return mockGuardianIndex + LOCAL_P2P_PORTRANGE_START + testId*20
}
// mockGuardianRunnable returns a runnable that first sets up a mock guardian an then runs it.
func mockGuardianRunnable(testId uint, gs []*mockGuardian, mockGuardianIndex uint, obsDb mock.ObservationDb) supervisor.Runnable {
func mockGuardianRunnable(gs []*mockGuardian, mockGuardianIndex uint, obsDb mock.ObservationDb) supervisor.Runnable {
return func(ctx context.Context) error {
// Create a sub-context with cancel function that we can pass to G.run.
ctx, ctxCancel := context.WithCancel(ctx)
@ -165,32 +163,28 @@ func mockGuardianRunnable(testId uint, gs []*mockGuardian, mockGuardianIndex uin
if err != nil {
return err
}
bootstrapPeers := fmt.Sprintf("/ip4/127.0.0.1/udp/%d/quic/p2p/%s", mockP2PPort(testId, 0), zeroPeerId.String())
p2pPort := mockP2PPort(testId, mockGuardianIndex)
// configure publicRpc
publicSocketPath := mockPublicSocket(testId, mockGuardianIndex)
publicRpc := mockPublicRpc(testId, mockGuardianIndex)
bootstrapPeers := fmt.Sprintf("/ip4/127.0.0.1/udp/%d/quic/p2p/%s", gs[0].config.p2pPort, zeroPeerId.String())
// configure adminservice
adminSocketPath := mockAdminStocket(testId, mockGuardianIndex)
rpcMap := make(map[string]string)
// We set this to None because we don't want to count these logs when counting the amount of logs generated per message
publicRpcLogDetail := common.GrpcLogDetailNone
cfg := gs[mockGuardianIndex].config
// assemble all the options
guardianOptions := []*GuardianOption{
GuardianOptionDatabase(db),
GuardianOptionWatchers(watcherConfigs, nil),
GuardianOptionNoAccountant(), // disable accountant
GuardianOptionGovernor(true),
GuardianOptionP2P(gs[mockGuardianIndex].p2pKey, networkID, bootstrapPeers, nodeName, false, p2pPort, func() string { return "" }),
GuardianOptionPublicRpcSocket(publicSocketPath, publicRpcLogDetail),
GuardianOptionPublicrpcTcpService(publicRpc, publicRpcLogDetail),
GuardianOptionPublicWeb(mockPublicWeb(testId, mockGuardianIndex), publicSocketPath, "", false, ""),
GuardianOptionAdminService(adminSocketPath, nil, nil, rpcMap),
GuardianOptionStatusServer(fmt.Sprintf("[::]:%d", mockStatusPort(testId, mockGuardianIndex))),
GuardianOptionP2P(gs[mockGuardianIndex].p2pKey, networkID, bootstrapPeers, nodeName, false, cfg.p2pPort, func() string { return "" }),
GuardianOptionPublicRpcSocket(cfg.publicSocket, publicRpcLogDetail),
GuardianOptionPublicrpcTcpService(cfg.publicRpc, publicRpcLogDetail),
GuardianOptionPublicWeb(cfg.publicWeb, cfg.publicSocket, "", false, ""),
GuardianOptionAdminService(cfg.adminSocket, nil, nil, rpcMap),
GuardianOptionStatusServer(fmt.Sprintf("[::]:%d", cfg.statusPort)),
GuardianOptionProcessor(),
}
@ -253,14 +247,14 @@ func waitForHeartbeatsInLogs(t testing.TB, zapObserver *observer.ObservedLogs, g
// WARNING: Currently, there is only a global registry for all prometheus metrics, leading to all guardian nodes writing to the same one.
//
// As long as this is the case, you probably don't want to use this function.
func waitForPromMetricGte(t testing.TB, testId uint, ctx context.Context, gs []*mockGuardian, metric string, min int) {
func waitForPromMetricGte(t testing.TB, ctx context.Context, gs []*mockGuardian, metric string, min int) {
metricBytes := []byte(metric)
requests := make([]*http.Request, len(gs))
readyFlags := make([]bool, len(gs))
// create the prom api clients
for i := range gs {
url := fmt.Sprintf("http://localhost:%d/metrics", mockStatusPort(testId, uint(i)))
url := fmt.Sprintf("http://localhost:%d/metrics", gs[i].config.statusPort)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
assert.NoError(t, err)
requests[i] = req
@ -582,13 +576,13 @@ func testConsensus(t *testing.T, testCases []testCase, numGuardians int) {
logger := supervisor.Logger(ctx)
// create the Guardian Set
gs := newMockGuardianSet(numGuardians)
gs := newMockGuardianSet(testId, numGuardians)
obsDb := makeObsDb(testCases)
// run the guardians
for i := 0; i < numGuardians; i++ {
gRun := mockGuardianRunnable(testId, gs, uint(i), obsDb)
gRun := mockGuardianRunnable(gs, uint(i), obsDb)
err := supervisor.Run(ctx, fmt.Sprintf("g-%d", i), gRun)
if i == 0 && numGuardians > 1 {
time.Sleep(time.Second) // give the bootstrap guardian some time to start up
@ -609,8 +603,8 @@ func testConsensus(t *testing.T, testCases []testCase, numGuardians int) {
}
// wait for the status server to come online and check that it works
for i := range gs {
err := testStatusServer(ctx, logger, fmt.Sprintf("http://127.0.0.1:%d/metrics", mockStatusPort(testId, uint(i))))
for _, g := range gs {
err := testStatusServer(ctx, logger, fmt.Sprintf("http://127.0.0.1:%d/metrics", g.config.statusPort))
assert.NoError(t, err)
}
@ -619,7 +613,7 @@ func testConsensus(t *testing.T, testCases []testCase, numGuardians int) {
assert.True(t, WAIT_FOR_LOGS || WAIT_FOR_METRICS)
assert.False(t, WAIT_FOR_LOGS && WAIT_FOR_METRICS) // can't do both, because they both write to gs[].ready
if WAIT_FOR_METRICS {
waitForPromMetricGte(t, testId, ctx, gs, PROMETHEUS_METRIC_VALID_HEARTBEAT_RECEIVED, 1)
waitForPromMetricGte(t, ctx, gs, PROMETHEUS_METRIC_VALID_HEARTBEAT_RECEIVED, 1)
}
if WAIT_FOR_LOGS {
waitForHeartbeatsInLogs(t, zapObserver, gs)
@ -645,14 +639,14 @@ func testConsensus(t *testing.T, testCases []testCase, numGuardians int) {
}
// Wait for adminrpc to come online
for zapObserver.FilterMessage("admin server listening on").FilterField(zap.String("path", mockAdminStocket(testId, adminRpcGuardianIndex))).Len() == 0 {
for zapObserver.FilterMessage("admin server listening on").FilterField(zap.String("path", gs[adminRpcGuardianIndex].config.adminSocket)).Len() == 0 {
logger.Info("admin server seems to be offline (according to logs). Waiting 100ms...")
time.Sleep(time.Microsecond * 100)
}
// Send manual re-observation requests
func() { // put this in own function to use defer
s := fmt.Sprintf("unix:///%s", mockAdminStocket(testId, vaaCheckGuardianIndex))
s := fmt.Sprintf("unix:///%s", gs[adminRpcGuardianIndex].config.adminSocket)
conn, err := grpc.DialContext(ctx, s, grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer conn.Close()
@ -677,14 +671,14 @@ func testConsensus(t *testing.T, testCases []testCase, numGuardians int) {
}()
// Wait for publicrpc to come online
for zapObserver.FilterMessage("publicrpc server listening").FilterField(zap.String("addr", mockPublicRpc(testId, vaaCheckGuardianIndex))).Len() == 0 {
for zapObserver.FilterMessage("publicrpc server listening").FilterField(zap.String("addr", gs[vaaCheckGuardianIndex].config.publicRpc)).Len() == 0 {
logger.Info("publicrpc seems to be offline (according to logs). Waiting 100ms...")
time.Sleep(time.Microsecond * 100)
}
// check that the VAAs were generated
logger.Info("Connecting to publicrpc...")
conn, err := grpc.DialContext(ctx, mockPublicRpc(testId, vaaCheckGuardianIndex), grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.DialContext(ctx, gs[vaaCheckGuardianIndex].config.publicRpc, grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer conn.Close()
@ -1022,13 +1016,13 @@ func benchmarkConsensus(t *testing.B, name string, numGuardians int, numMessages
logger := supervisor.Logger(ctx)
// create the Guardian Set
gs := newMockGuardianSet(numGuardians)
gs := newMockGuardianSet(testId, numGuardians)
var obsDb mock.ObservationDb = nil // TODO
// run the guardians
for i := 0; i < numGuardians; i++ {
gRun := mockGuardianRunnable(testId, gs, uint(i), obsDb)
gRun := mockGuardianRunnable(gs, uint(i), obsDb)
err := supervisor.Run(ctx, fmt.Sprintf("g-%d", i), gRun)
if i == 0 && numGuardians > 1 {
time.Sleep(time.Second) // give the bootstrap guardian some time to start up
@ -1049,8 +1043,8 @@ func benchmarkConsensus(t *testing.B, name string, numGuardians int, numMessages
}
// wait for the status server to come online and check that it works
for i := range gs {
err := testStatusServer(ctx, logger, fmt.Sprintf("http://127.0.0.1:%d/metrics", mockStatusPort(testId, uint(i))))
for _, g := range gs {
err := testStatusServer(ctx, logger, fmt.Sprintf("http://127.0.0.1:%d/metrics", g.config.statusPort))
assert.NoError(t, err)
}
@ -1058,7 +1052,7 @@ func benchmarkConsensus(t *testing.B, name string, numGuardians int, numMessages
// This is necessary because if they have not joined the p2p network yet, gossip messages may get dropped silently.
assert.True(t, WAIT_FOR_LOGS || WAIT_FOR_METRICS)
if WAIT_FOR_METRICS {
waitForPromMetricGte(t, testId, ctx, gs, PROMETHEUS_METRIC_VALID_HEARTBEAT_RECEIVED, 1)
waitForPromMetricGte(t, ctx, gs, PROMETHEUS_METRIC_VALID_HEARTBEAT_RECEIVED, 1)
}
if WAIT_FOR_LOGS {
waitForHeartbeatsInLogs(t, zapObserver, gs)
@ -1066,12 +1060,12 @@ func benchmarkConsensus(t *testing.B, name string, numGuardians int, numMessages
logger.Info("All Guardians have received at least one heartbeat.")
// Wait for publicrpc to come online.
for zapObserver.FilterMessage("publicrpc server listening").FilterField(zap.String("addr", mockPublicRpc(testId, 0))).Len() == 0 {
for zapObserver.FilterMessage("publicrpc server listening").FilterField(zap.String("addr", gs[0].config.publicRpc)).Len() == 0 {
logger.Info("publicrpc seems to be offline (according to logs). Waiting 100ms...")
time.Sleep(time.Microsecond * 100)
}
// now that it's online, connect to publicrpc of guardian-0
conn, err := grpc.DialContext(ctx, mockPublicRpc(testId, 0), grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.DialContext(ctx, gs[0].config.publicRpc, grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer conn.Close()
c := publicrpcv1.NewPublicRPCServiceClient(conn)