node/node_test: add waitForHeartbeatsInMetrics

This commit is contained in:
tbjump 2023-06-08 20:18:38 +00:00 committed by tbjump
parent c3ec2206a2
commit 0988fd9320
1 changed files with 61 additions and 3 deletions

View File

@ -1,6 +1,8 @@
package node
import (
"bufio"
"bytes"
"context"
"crypto/ecdsa"
"crypto/rand"
@ -43,6 +45,11 @@ const LOCAL_RPC_PORTRANGE_START = 10000
const LOCAL_P2P_PORTRANGE_START = 10100
const LOCAL_STATUS_PORTRANGE_START = 10200
var PROMETHEUS_METRIC_VALID_HEARTBEAT_RECEIVED = []byte("wormhole_p2p_broadcast_messages_received_total{type=\"valid_heartbeat\"}")
const WAIT_FOR_LOGS = true
const WAIT_FOR_METRICS = false
type mockGuardian struct {
p2pKey libp2p_crypto.PrivKey
MockObservationC chan *common.MessagePublication
@ -183,7 +190,7 @@ func setupLogsCapture() (*zap.Logger, *observer.ObservedLogs) {
return logger, logs
}
func waitForHeartbeats(t *testing.T, zapObserver *observer.ObservedLogs, gs []*mockGuardian) {
func waitForHeartbeatsInLogs(t *testing.T, zapObserver *observer.ObservedLogs, gs []*mockGuardian) {
// example log entry that we're looking for:
// DEBUG root.g-2.g.p2p p2p/p2p.go:465 valid signed heartbeat received {"value": "node_name:\"g-0\" timestamp:1685677055425243683 version:\"development\" guardian_addr:\"0xeF2a03eAec928DD0EEAf35aD31e34d2b53152c07\" boot_timestamp:1685677040424855922 p2p_node_id:\"\\x00$\\x08\\x01\\x12 \\x97\\xf3\\xbd\\x87\\x13\\x15(\\x1e\\x8b\\x83\\xedǩ\\xfd\\x05A\\x06aTD\\x90p\\xcc\\xdb<\\xddB\\xcfi\\xccވ\"", "from": "12D3KooWL3XJ9EMCyZvmmGXL2LMiVBtrVa2BuESsJiXkSj7333Jw"}
// TODO maybe instead of looking at log entries, we could determine this status through prometheus metrics, which might be more stable
@ -213,6 +220,51 @@ func waitForHeartbeats(t *testing.T, zapObserver *observer.ObservedLogs, gs []*m
}
}
func waitForHeartbeatsInMetrics(t *testing.T, ctx context.Context, gs []*mockGuardian) {
requests := make([]*http.Request, len(gs))
//logger := supervisor.Logger(ctx)
// create the prom api clients
for i := range gs {
url := fmt.Sprintf("http://localhost:%d/metrics", mockStatusPort(uint(i)))
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
assert.NoError(t, err)
requests[i] = req
}
// query them
for readyCounter := 0; readyCounter < len(gs); {
for i, g := range gs {
if g.ready {
continue
}
resp, err := http.DefaultClient.Do(requests[i])
if err != nil {
resp.Body.Close()
continue
}
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Bytes()
if bytes.HasPrefix(line, PROMETHEUS_METRIC_VALID_HEARTBEAT_RECEIVED) {
res, err := strconv.Atoi(string(bytes.Split(line, []byte(" "))[1])) // split at the space and convert to integer
assert.NoError(t, err)
if res > 0 {
g.ready = true
readyCounter++
break
}
}
}
//logger.Info("node not ready yet", zap.Int("i", i))
}
time.Sleep(time.Second * 5)
}
}
type testCase struct {
msg *common.MessagePublication // a Wormhole message
// number of Guardians who will initially observe this message through the mock watcher
@ -401,13 +453,19 @@ func testConsensus(t *testing.T, testCases []testCase, numGuardians int) {
// wait for the status server to come online and check that it works
for i := range gs {
err := testStatusServer(ctx, logger, fmt.Sprintf("http://127.0.0.1:%d", mockStatusPort(uint(i))))
err := testStatusServer(ctx, logger, fmt.Sprintf("http://127.0.0.1:%d/metrics", mockStatusPort(uint(i))))
assert.NoError(t, err)
}
// Wait for them to connect each other and receive at least one heartbeat.
// This is necessary because if they have not joined the p2p network yet, gossip messages may get dropped silently.
waitForHeartbeats(t, zapObserver, gs)
assert.True(t, WAIT_FOR_LOGS || WAIT_FOR_METRICS)
if WAIT_FOR_METRICS {
waitForHeartbeatsInMetrics(t, ctx, gs)
}
if WAIT_FOR_LOGS {
waitForHeartbeatsInLogs(t, zapObserver, gs)
}
logger.Info("All Guardians have received at least one heartbeat.")
// have them make observations