diff --git a/node/pkg/node/node_test.go b/node/pkg/node/node_test.go index e1ce0c08c..82370fd56 100644 --- a/node/pkg/node/node_test.go +++ b/node/pkg/node/node_test.go @@ -1,6 +1,8 @@ package node import ( + "bufio" + "bytes" "context" "crypto/ecdsa" "crypto/rand" @@ -43,6 +45,11 @@ const LOCAL_RPC_PORTRANGE_START = 10000 const LOCAL_P2P_PORTRANGE_START = 10100 const LOCAL_STATUS_PORTRANGE_START = 10200 +var PROMETHEUS_METRIC_VALID_HEARTBEAT_RECEIVED = []byte("wormhole_p2p_broadcast_messages_received_total{type=\"valid_heartbeat\"}") + +const WAIT_FOR_LOGS = true +const WAIT_FOR_METRICS = false + type mockGuardian struct { p2pKey libp2p_crypto.PrivKey MockObservationC chan *common.MessagePublication @@ -183,7 +190,7 @@ func setupLogsCapture() (*zap.Logger, *observer.ObservedLogs) { return logger, logs } -func waitForHeartbeats(t *testing.T, zapObserver *observer.ObservedLogs, gs []*mockGuardian) { +func waitForHeartbeatsInLogs(t *testing.T, zapObserver *observer.ObservedLogs, gs []*mockGuardian) { // example log entry that we're looking for: // DEBUG root.g-2.g.p2p p2p/p2p.go:465 valid signed heartbeat received {"value": "node_name:\"g-0\" timestamp:1685677055425243683 version:\"development\" guardian_addr:\"0xeF2a03eAec928DD0EEAf35aD31e34d2b53152c07\" boot_timestamp:1685677040424855922 p2p_node_id:\"\\x00$\\x08\\x01\\x12 \\x97\\xf3\\xbd\\x87\\x13\\x15(\\x1e\\x8b\\x83\\xedǩ\\xfd\\x05A\\x06aTD\\x90p\\xcc\\xdb<\\xddB\\xcfi\\xccވ\"", "from": "12D3KooWL3XJ9EMCyZvmmGXL2LMiVBtrVa2BuESsJiXkSj7333Jw"} // TODO maybe instead of looking at log entries, we could determine this status through prometheus metrics, which might be more stable @@ -213,6 +220,51 @@ func waitForHeartbeats(t *testing.T, zapObserver *observer.ObservedLogs, gs []*m } } +func waitForHeartbeatsInMetrics(t *testing.T, ctx context.Context, gs []*mockGuardian) { + requests := make([]*http.Request, len(gs)) + //logger := supervisor.Logger(ctx) + + // create the prom api clients + for i := range gs { + url := fmt.Sprintf("http://localhost:%d/metrics", mockStatusPort(uint(i))) + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + assert.NoError(t, err) + requests[i] = req + } + + // query them + for readyCounter := 0; readyCounter < len(gs); { + for i, g := range gs { + if g.ready { + continue + } + + resp, err := http.DefaultClient.Do(requests[i]) + if err != nil { + resp.Body.Close() + continue + } + + scanner := bufio.NewScanner(resp.Body) + for scanner.Scan() { + line := scanner.Bytes() + if bytes.HasPrefix(line, PROMETHEUS_METRIC_VALID_HEARTBEAT_RECEIVED) { + res, err := strconv.Atoi(string(bytes.Split(line, []byte(" "))[1])) // split at the space and convert to integer + assert.NoError(t, err) + if res > 0 { + g.ready = true + readyCounter++ + break + } + } + } + + //logger.Info("node not ready yet", zap.Int("i", i)) + } + time.Sleep(time.Second * 5) + } +} + type testCase struct { msg *common.MessagePublication // a Wormhole message // number of Guardians who will initially observe this message through the mock watcher @@ -401,13 +453,19 @@ func testConsensus(t *testing.T, testCases []testCase, numGuardians int) { // wait for the status server to come online and check that it works for i := range gs { - err := testStatusServer(ctx, logger, fmt.Sprintf("http://127.0.0.1:%d", mockStatusPort(uint(i)))) + err := testStatusServer(ctx, logger, fmt.Sprintf("http://127.0.0.1:%d/metrics", mockStatusPort(uint(i)))) assert.NoError(t, err) } // Wait for them to connect each other and receive at least one heartbeat. // This is necessary because if they have not joined the p2p network yet, gossip messages may get dropped silently. - waitForHeartbeats(t, zapObserver, gs) + assert.True(t, WAIT_FOR_LOGS || WAIT_FOR_METRICS) + if WAIT_FOR_METRICS { + waitForHeartbeatsInMetrics(t, ctx, gs) + } + if WAIT_FOR_LOGS { + waitForHeartbeatsInLogs(t, zapObserver, gs) + } logger.Info("All Guardians have received at least one heartbeat.") // have them make observations