node: Add benchmarks

This commit is contained in:
tbjump 2023-06-27 13:39:25 +00:00 committed by tbjump
parent 9c668072ef
commit 2429f51ee3
1 changed files with 329 additions and 70 deletions

View File

@ -7,6 +7,7 @@ import (
"crypto/ecdsa"
"crypto/rand"
"encoding/binary"
"errors"
"fmt"
"io"
"math/big"
@ -19,6 +20,8 @@ import (
"testing"
"time"
"sync/atomic"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/db"
"github.com/certusone/wormhole/node/pkg/devnet"
@ -47,9 +50,9 @@ import (
)
const LOCAL_RPC_PORTRANGE_START = 10000
const LOCAL_P2P_PORTRANGE_START = 10100
const LOCAL_STATUS_PORTRANGE_START = 10200
const LOCAL_PUBLICWEB_PORTRANGE_START = 10300
const LOCAL_P2P_PORTRANGE_START = 11000
const LOCAL_STATUS_PORTRANGE_START = 12000
const LOCAL_PUBLICWEB_PORTRANGE_START = 13000
var PROMETHEUS_METRIC_VALID_HEARTBEAT_RECEIVED = "wormhole_p2p_broadcast_messages_received_total{type=\"valid_heartbeat\"}"
@ -57,7 +60,13 @@ const WAIT_FOR_LOGS = true
const WAIT_FOR_METRICS = false
// The level at which logs will be written to console; During testing, logs are produced and buffered at Debug level, because some tests need to look for certain entries.
const CONSOLE_LOG_LEVEL = zap.InfoLevel
var CONSOLE_LOG_LEVEL = zap.InfoLevel
var TEST_ID_CTR atomic.Uint32
func getTestId() uint {
return uint(TEST_ID_CTR.Add(1))
}
type mockGuardian struct {
p2pKey libp2p_crypto.PrivKey
@ -98,28 +107,32 @@ func mockGuardianSetToGuardianAddrList(gs []*mockGuardian) []eth_common.Address
return result
}
func mockPublicSocket(mockGuardianIndex uint) string {
return fmt.Sprintf("/tmp/test_guardian_%d_public.socket", mockGuardianIndex)
func mockPublicSocket(testId uint, mockGuardianIndex uint) string {
return fmt.Sprintf("/tmp/test_guardian_%d_public.socket", mockGuardianIndex+testId*20)
}
func mockAdminStocket(mockGuardianIndex uint) string {
return fmt.Sprintf("/tmp/test_guardian_%d_admin.socket", mockGuardianIndex)
func mockAdminStocket(testId uint, mockGuardianIndex uint) string {
return fmt.Sprintf("/tmp/test_guardian_%d_admin.socket", mockGuardianIndex+testId*20)
}
func mockPublicRpc(mockGuardianIndex uint) string {
return fmt.Sprintf("127.0.0.1:%d", mockGuardianIndex+LOCAL_RPC_PORTRANGE_START)
func mockPublicRpc(testId uint, mockGuardianIndex uint) string {
return fmt.Sprintf("127.0.0.1:%d", mockGuardianIndex+LOCAL_RPC_PORTRANGE_START+testId*20)
}
func mockPublicWeb(mockGuardianIndex uint) string {
return fmt.Sprintf("127.0.0.1:%d", mockGuardianIndex+LOCAL_PUBLICWEB_PORTRANGE_START)
func mockPublicWeb(testId uint, mockGuardianIndex uint) string {
return fmt.Sprintf("127.0.0.1:%d", mockGuardianIndex+LOCAL_PUBLICWEB_PORTRANGE_START+testId*20)
}
func mockStatusPort(mockGuardianIndex uint) uint {
return mockGuardianIndex + LOCAL_STATUS_PORTRANGE_START
func mockStatusPort(testId uint, mockGuardianIndex uint) uint {
return mockGuardianIndex + LOCAL_STATUS_PORTRANGE_START + testId*20
}
func mockP2PPort(testId uint, mockGuardianIndex uint) uint {
return mockGuardianIndex + LOCAL_P2P_PORTRANGE_START + testId*20
}
// mockGuardianRunnable returns a runnable that first sets up a mock guardian an then runs it.
func mockGuardianRunnable(gs []*mockGuardian, mockGuardianIndex uint, obsDb mock.ObservationDb) supervisor.Runnable {
func mockGuardianRunnable(testId uint, gs []*mockGuardian, mockGuardianIndex uint, obsDb mock.ObservationDb) supervisor.Runnable {
return func(ctx context.Context) error {
// Create a sub-context with cancel function that we can pass to G.run.
ctx, ctxCancel := context.WithCancel(ctx)
@ -151,15 +164,15 @@ func mockGuardianRunnable(gs []*mockGuardian, mockGuardianIndex uint, obsDb mock
if err != nil {
return err
}
bootstrapPeers := fmt.Sprintf("/ip4/127.0.0.1/udp/%d/quic/p2p/%s", LOCAL_P2P_PORTRANGE_START, zeroPeerId.String())
p2pPort := uint(LOCAL_P2P_PORTRANGE_START + mockGuardianIndex)
bootstrapPeers := fmt.Sprintf("/ip4/127.0.0.1/udp/%d/quic/p2p/%s", mockP2PPort(testId, 0), zeroPeerId.String())
p2pPort := mockP2PPort(testId, mockGuardianIndex)
// configure publicRpc
publicSocketPath := mockPublicSocket(mockGuardianIndex)
publicRpc := mockPublicRpc(mockGuardianIndex)
publicSocketPath := mockPublicSocket(testId, mockGuardianIndex)
publicRpc := mockPublicRpc(testId, mockGuardianIndex)
// configure adminservice
adminSocketPath := mockAdminStocket(mockGuardianIndex)
adminSocketPath := mockAdminStocket(testId, mockGuardianIndex)
rpcMap := make(map[string]string)
// assemble all the options
@ -171,9 +184,9 @@ func mockGuardianRunnable(gs []*mockGuardian, mockGuardianIndex uint, obsDb mock
GuardianOptionP2P(gs[mockGuardianIndex].p2pKey, networkID, bootstrapPeers, nodeName, false, p2pPort, func() string { return "" }),
GuardianOptionPublicRpcSocket(publicSocketPath, common.GrpcLogDetailFull),
GuardianOptionPublicrpcTcpService(publicRpc, common.GrpcLogDetailFull),
GuardianOptionPublicWeb(mockPublicWeb(mockGuardianIndex), publicSocketPath, "", false, ""),
GuardianOptionPublicWeb(mockPublicWeb(testId, mockGuardianIndex), publicSocketPath, "", false, ""),
GuardianOptionAdminService(adminSocketPath, nil, nil, rpcMap),
GuardianOptionStatusServer(fmt.Sprintf("[::]:%d", mockStatusPort(mockGuardianIndex))),
GuardianOptionStatusServer(fmt.Sprintf("[::]:%d", mockStatusPort(testId, mockGuardianIndex))),
GuardianOptionProcessor(),
}
@ -187,6 +200,9 @@ func mockGuardianRunnable(gs []*mockGuardian, mockGuardianIndex uint, obsDb mock
}
<-ctx.Done()
time.Sleep(time.Second * 1) // Wait 1s for all sorts of things to complete.
db.Close() // close BadgerDb
return nil
}
}
@ -224,7 +240,7 @@ func waitForHeartbeatsInLogs(t testing.TB, zapObserver *observer.ObservedLogs, g
}
}
}
time.Sleep(time.Microsecond * 100)
time.Sleep(time.Millisecond)
}
}
@ -232,14 +248,15 @@ func waitForHeartbeatsInLogs(t testing.TB, zapObserver *observer.ObservedLogs, g
// WARNING: Currently, there is only a global registry for all prometheus metrics, leading to all guardian nodes writing to the same one.
//
// As long as this is the case, you probably don't want to use this function.
func waitForPromMetricGte(t testing.TB, ctx context.Context, gs []*mockGuardian, metric string, min int) {
func waitForPromMetricGte(t testing.TB, testId uint, ctx context.Context, gs []*mockGuardian, metric string, min int) {
metricBytes := []byte(metric)
requests := make([]*http.Request, len(gs))
readyFlags := make([]bool, len(gs))
//logger := supervisor.Logger(ctx)
// create the prom api clients
for i := range gs {
url := fmt.Sprintf("http://localhost:%d/metrics", mockStatusPort(uint(i)))
url := fmt.Sprintf("http://localhost:%d/metrics", mockStatusPort(testId, uint(i)))
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
assert.NoError(t, err)
requests[i] = req
@ -251,8 +268,8 @@ func waitForPromMetricGte(t testing.TB, ctx context.Context, gs []*mockGuardian,
// query them
for readyCounter := 0; readyCounter < len(gs); {
for i, g := range gs {
if g.ready {
for i := range gs {
if readyFlags[i] {
continue
}
@ -279,12 +296,39 @@ func waitForPromMetricGte(t testing.TB, ctx context.Context, gs []*mockGuardian,
}()
if ready {
g.ready = true
readyFlags[i] = true
readyCounter++
}
}
time.Sleep(time.Second * 5)
time.Sleep(time.Second * 5) // TODO
}
}
// waitForVaa polls the publicRpc service every 5ms until there is a response.
func waitForVaa(ctx context.Context, c publicrpcv1.PublicRPCServiceClient, msgId *publicrpcv1.MessageID, mustNotReachQuorum bool) (*publicrpcv1.GetSignedVAAResponse, error) {
var r *publicrpcv1.GetSignedVAAResponse
var err error
//logger := supervisor.Logger(ctx)
for {
select {
case <-ctx.Done():
return nil, errors.New("context canceled")
default:
queryCtx, queryCancel := context.WithTimeout(ctx, time.Second)
r, err = c.GetSignedVAA(queryCtx, &publicrpcv1.GetSignedVAARequest{MessageId: msgId})
queryCancel()
}
if err == nil && r != nil {
// success
return r, err
}
if mustNotReachQuorum {
// no need to re-try because we're expecting an error.
return r, err
}
time.Sleep(time.Millisecond * 10)
}
}
@ -309,6 +353,8 @@ func randomTime() time.Time {
}
var someMsgSequenceCounter uint64 = 0
var someMsgEmitter vaa.Address = [32]byte{1, 2, 3}
var someMsgEmitterChain vaa.ChainID = vaa.ChainIDSolana
func someMessage() *common.MessagePublication {
someMsgSequenceCounter++
@ -318,8 +364,8 @@ func someMessage() *common.MessagePublication {
Nonce: math_rand.Uint32(), //nolint
Sequence: someMsgSequenceCounter,
ConsistencyLevel: 1,
EmitterChain: vaa.ChainIDSolana,
EmitterAddress: [32]byte{1, 2, 3},
EmitterChain: someMsgEmitterChain,
EmitterAddress: someMsgEmitter,
Payload: []byte{},
Unreliable: false,
}
@ -452,7 +498,7 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}
// TestBasicConsensus tests that a set of guardians can form consensus on certain messages and reject certain other messages
// TestConsensus tests that a set of guardians can form consensus on certain messages and reject certain other messages
func TestConsensus(t *testing.T) {
const numGuardians = 4 // Quorum will be 3 out of 4 guardians.
@ -522,6 +568,7 @@ func testConsensus(t *testing.T, testCases []testCase, numGuardians int) {
const guardianSetIndex = 5 // index of the active guardian set (can be anything, just needs to be set to something)
const vaaCheckGuardianIndex uint = 0 // we will query this guardian's publicrpc for VAAs
const adminRpcGuardianIndex uint = 0 // we will query this guardian's adminRpc
testId := getTestId()
// Test's main lifecycle context.
rootCtx, rootCtxCancel := context.WithTimeout(context.Background(), testTimeout)
@ -539,7 +586,7 @@ func testConsensus(t *testing.T, testCases []testCase, numGuardians int) {
// run the guardians
for i := 0; i < numGuardians; i++ {
gRun := mockGuardianRunnable(gs, uint(i), obsDb)
gRun := mockGuardianRunnable(testId, gs, uint(i), obsDb)
err := supervisor.Run(ctx, fmt.Sprintf("g-%d", i), gRun)
if i == 0 && numGuardians > 1 {
time.Sleep(time.Second) // give the bootstrap guardian some time to start up
@ -561,7 +608,7 @@ func testConsensus(t *testing.T, testCases []testCase, numGuardians int) {
// wait for the status server to come online and check that it works
for i := range gs {
err := testStatusServer(ctx, logger, fmt.Sprintf("http://127.0.0.1:%d/metrics", mockStatusPort(uint(i))))
err := testStatusServer(ctx, logger, fmt.Sprintf("http://127.0.0.1:%d/metrics", mockStatusPort(testId, uint(i))))
assert.NoError(t, err)
}
@ -570,7 +617,7 @@ func testConsensus(t *testing.T, testCases []testCase, numGuardians int) {
assert.True(t, WAIT_FOR_LOGS || WAIT_FOR_METRICS)
assert.False(t, WAIT_FOR_LOGS && WAIT_FOR_METRICS) // can't do both, because they both write to gs[].ready
if WAIT_FOR_METRICS {
waitForPromMetricGte(t, ctx, gs, PROMETHEUS_METRIC_VALID_HEARTBEAT_RECEIVED, 1)
waitForPromMetricGte(t, testId, ctx, gs, PROMETHEUS_METRIC_VALID_HEARTBEAT_RECEIVED, 1)
}
if WAIT_FOR_LOGS {
waitForHeartbeatsInLogs(t, zapObserver, gs)
@ -596,14 +643,14 @@ func testConsensus(t *testing.T, testCases []testCase, numGuardians int) {
}
// Wait for adminrpc to come online
for zapObserver.FilterMessage("admin server listening on").FilterField(zap.String("path", mockAdminStocket(adminRpcGuardianIndex))).Len() == 0 {
for zapObserver.FilterMessage("admin server listening on").FilterField(zap.String("path", mockAdminStocket(testId, adminRpcGuardianIndex))).Len() == 0 {
logger.Info("admin server seems to be offline (according to logs). Waiting 100ms...")
time.Sleep(time.Microsecond * 100)
}
// Send manual re-observation requests
func() { // put this in own function to use defer
s := fmt.Sprintf("unix:///%s", mockAdminStocket(vaaCheckGuardianIndex))
s := fmt.Sprintf("unix:///%s", mockAdminStocket(testId, vaaCheckGuardianIndex))
conn, err := grpc.DialContext(ctx, s, grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer conn.Close()
@ -628,14 +675,14 @@ func testConsensus(t *testing.T, testCases []testCase, numGuardians int) {
}()
// Wait for publicrpc to come online
for zapObserver.FilterMessage("publicrpc server listening").FilterField(zap.String("addr", mockPublicRpc(vaaCheckGuardianIndex))).Len() == 0 {
for zapObserver.FilterMessage("publicrpc server listening").FilterField(zap.String("addr", mockPublicRpc(testId, vaaCheckGuardianIndex))).Len() == 0 {
logger.Info("publicrpc seems to be offline (according to logs). Waiting 100ms...")
time.Sleep(time.Microsecond * 100)
}
// check that the VAAs were generated
logger.Info("Connecting to publicrpc...")
conn, err := grpc.DialContext(ctx, mockPublicRpc(vaaCheckGuardianIndex), grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.DialContext(ctx, mockPublicRpc(testId, vaaCheckGuardianIndex), grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer conn.Close()
@ -650,38 +697,12 @@ func testConsensus(t *testing.T, testCases []testCase, numGuardians int) {
logger.Info("Checking result of testcase", zap.Int("test_case", i))
// poll the API until we get a response without error
var r *publicrpcv1.GetSignedVAAResponse
var err error
for {
select {
case <-ctx.Done():
break
default:
// timeout for grpc query
logger.Info("attempting to query for VAA", zap.Int("test_case", i))
queryCtx, queryCancel := context.WithTimeout(ctx, time.Second)
r, err = c.GetSignedVAA(queryCtx, &publicrpcv1.GetSignedVAARequest{
MessageId: &publicrpcv1.MessageID{
EmitterChain: publicrpcv1.ChainID(msg.EmitterChain),
EmitterAddress: msg.EmitterAddress.String(),
Sequence: msg.Sequence,
},
})
queryCancel()
if err != nil {
logger.Info("error querying for VAA. Trying agin in 100ms.", zap.Int("test_case", i), zap.Error(err))
}
}
if err == nil && r != nil {
logger.Info("Received VAA from publicrpc", zap.Int("test_case", i), zap.Binary("vaa_bytes", r.VaaBytes))
break
}
if testCase.mustNotReachQuorum {
// no need to re-try because we're expecting an error. (and later we'll assert that's indeed an error)
break
}
time.Sleep(time.Millisecond * 100)
msgId := &publicrpcv1.MessageID{
EmitterChain: publicrpcv1.ChainID(msg.EmitterChain),
EmitterAddress: msg.EmitterAddress.String(),
Sequence: msg.Sequence,
}
r, err := waitForVaa(ctx, c, msgId, testCase.mustNotReachQuorum)
assert.NotEqual(t, testCase.mustNotReachQuorum, testCase.mustReachQuorum) // either or
if testCase.mustNotReachQuorum {
@ -881,3 +902,241 @@ func (c fatalHook) OnWrite(ce *zapcore.CheckedEntry, fields []zapcore.Field) {
c <- sb.String()
panic(ce.Message)
}
func signingMsgs(n int) [][]byte {
msgs := make([][]byte, n)
for i := 0; i < len(msgs); i++ {
msgs[i] = ethcrypto.Keccak256Hash([]byte{byte(i)}).Bytes()
}
return msgs
}
func signMsgsP2p(pk libp2p_crypto.PrivKey, msgs [][]byte) [][]byte {
n := len(msgs)
signatures := make([][]byte, n)
// Ed25519.Sign
for i := 0; i < n; i++ {
sig, err := pk.Sign(msgs[i])
if err != nil {
panic(err)
}
signatures[i] = sig
}
return signatures
}
func signMsgsEth(pk *ecdsa.PrivateKey, msgs [][]byte) [][]byte {
n := len(msgs)
signatures := make([][]byte, n)
// Ed25519.Sign
for i := 0; i < n; i++ {
sig, err := ethcrypto.Sign(msgs[i], pk)
if err != nil {
panic(err)
}
signatures[i] = sig
}
return signatures
}
func BenchmarkCrypto(b *testing.B) {
b.Run("libp2p (Ed25519)", func(b *testing.B) {
p2pKey := devnet.DeterministicP2PPrivKeyByIndex(1)
b.Run("sign", func(b *testing.B) {
msgs := signingMsgs(b.N)
b.ResetTimer()
signMsgsP2p(p2pKey, msgs)
})
b.Run("verify", func(b *testing.B) {
msgs := signingMsgs(b.N)
signatures := signMsgsP2p(p2pKey, msgs)
b.ResetTimer()
// Ed25519.Verify
for i := 0; i < b.N; i++ {
ok, err := p2pKey.GetPublic().Verify(msgs[i], signatures[i])
assert.NoError(b, err)
assert.True(b, ok)
}
})
})
b.Run("ethcrypto (secp256k1)", func(b *testing.B) {
gk := devnet.InsecureDeterministicEcdsaKeyByIndex(ethcrypto.S256(), 0)
//gkc := ethcrypto.CompressPubkey(&gk.PublicKey)
b.Run("sign", func(b *testing.B) {
msgs := signingMsgs(b.N)
b.ResetTimer()
signMsgsEth(gk, msgs)
})
b.Run("verify", func(b *testing.B) {
msgs := signingMsgs(b.N)
signatures := signMsgsEth(gk, msgs)
b.ResetTimer()
// Ed25519.Verify
for i := 0; i < b.N; i++ {
_, err := ethcrypto.Ecrecover(msgs[i], signatures[i])
assert.NoError(b, err)
//assert.Equal(b, signer, gkc)
}
})
})
}
// How to run: go test -v -ldflags '-extldflags "-Wl,--allow-multiple-definition" ' -bench ^BenchmarkConsensus -benchtime=1x -count 1 -run ^$ > bench.log; tail bench.log
func BenchmarkConsensus(b *testing.B) {
require.Equal(b, b.N, 1)
//CONSOLE_LOG_LEVEL = zap.DebugLevel
//CONSOLE_LOG_LEVEL = zap.InfoLevel
CONSOLE_LOG_LEVEL = zap.WarnLevel
benchmarkConsensus(b, "1", 19, 1000, 2) // ~28s
//benchmarkConsensus(b, "1", 19, 100, 2) // ~2s
//benchmarkConsensus(b, "1", 19, 100, 3) // sometimes fails, i.e. too much parallelism
//benchmarkConsensus(b, "1", 19, 100, 10) // sometimes fails, i.e. too much parallelism
//benchmarkConsensus(b, "1", 19, 100, 1) // 3s
//benchmarkConsensus(b, "1", 19, 20, 1) // 0.6s
//benchmarkConsensus(b, "1", 19, 5, 1) // 0.2s
}
func benchmarkConsensus(t *testing.B, name string, numGuardians int, numMessages int, maxPendingObs int) {
t.Run(name, func(t *testing.B) {
require.Equal(t, t.N, 1)
testId := getTestId()
msgSeqStart := someMsgSequenceCounter
const testTimeout = time.Minute * 2
const guardianSetIndex = 5 // index of the active guardian set (can be anything, just needs to be set to something)
// Test's main lifecycle context.
rootCtx, rootCtxCancel := context.WithTimeout(context.Background(), testTimeout)
defer rootCtxCancel()
zapLogger, zapObserver := setupLogsCapture()
supervisor.New(rootCtx, zapLogger, func(ctx context.Context) error {
logger := supervisor.Logger(ctx)
// create the Guardian Set
gs := newMockGuardianSet(numGuardians)
var obsDb mock.ObservationDb = nil // TODO
// run the guardians
for i := 0; i < numGuardians; i++ {
gRun := mockGuardianRunnable(testId, gs, uint(i), obsDb)
err := supervisor.Run(ctx, fmt.Sprintf("g-%d", i), gRun)
if i == 0 && numGuardians > 1 {
time.Sleep(time.Second) // give the bootstrap guardian some time to start up
}
assert.NoError(t, err)
}
logger.Info("All Guardians initiated.")
supervisor.Signal(ctx, supervisor.SignalHealthy)
// Inform them of the Guardian Set
commonGuardianSet := common.GuardianSet{
Keys: mockGuardianSetToGuardianAddrList(gs),
Index: guardianSetIndex,
}
for i, g := range gs {
logger.Info("Sending guardian set update", zap.Int("guardian_index", i))
g.MockSetC <- &commonGuardianSet
}
// wait for the status server to come online and check that it works
for i := range gs {
err := testStatusServer(ctx, logger, fmt.Sprintf("http://127.0.0.1:%d/metrics", mockStatusPort(testId, uint(i))))
assert.NoError(t, err)
}
// Wait for them to connect each other and receive at least one heartbeat.
// This is necessary because if they have not joined the p2p network yet, gossip messages may get dropped silently.
assert.True(t, WAIT_FOR_LOGS || WAIT_FOR_METRICS)
if WAIT_FOR_METRICS {
waitForPromMetricGte(t, testId, ctx, gs, PROMETHEUS_METRIC_VALID_HEARTBEAT_RECEIVED, 1)
}
if WAIT_FOR_LOGS {
waitForHeartbeatsInLogs(t, zapObserver, gs)
}
logger.Info("All Guardians have received at least one heartbeat.")
// Wait for publicrpc to come online.
for zapObserver.FilterMessage("publicrpc server listening").FilterField(zap.String("addr", mockPublicRpc(testId, 0))).Len() == 0 {
logger.Info("publicrpc seems to be offline (according to logs). Waiting 100ms...")
time.Sleep(time.Microsecond * 100)
}
// now that it's online, connect to publicrpc of guardian-0
conn, err := grpc.DialContext(ctx, mockPublicRpc(testId, 0), grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer conn.Close()
c := publicrpcv1.NewPublicRPCServiceClient(conn)
logger.Info("-----------Beginning benchmark-----------")
t.ResetTimer()
// nextObsReadyC ensures that there are not more than `maxPendingObs` observations pending at any given point in time.
nextObsReadyC := make(chan struct{}, maxPendingObs)
for j := 0; j < maxPendingObs; j++ {
nextObsReadyC <- struct{}{}
}
go func() {
// feed observations to nodes
for i := 0; i < numMessages; i++ {
select {
case <-ctx.Done():
return
case <-nextObsReadyC:
msg := someMessage()
for _, g := range gs {
msgCopy := *msg
g.MockObservationC <- &msgCopy
}
}
}
}()
// check that the VAAs were generated
for i := 0; i < numMessages; i++ {
msgId := &publicrpcv1.MessageID{
EmitterChain: publicrpcv1.ChainID(someMsgEmitterChain),
EmitterAddress: someMsgEmitter.String(),
Sequence: msgSeqStart + uint64(i+1),
}
// a VAA should not take longer than 10s to be produced, no matter what.
waitCtx, cancelFunc := context.WithTimeout(ctx, time.Second*10)
_, err := waitForVaa(waitCtx, c, msgId, false)
cancelFunc()
assert.NoError(t, err)
if err != nil {
// early cancel the benchmark
rootCtxCancel()
}
nextObsReadyC <- struct{}{}
}
// We're done!
logger.Info("Tests completed.")
t.StopTimer()
supervisor.Signal(ctx, supervisor.SignalDone)
rootCtxCancel()
return nil
},
supervisor.WithPropagatePanic)
<-rootCtx.Done()
assert.NotEqual(t, rootCtx.Err(), context.DeadlineExceeded)
zapLogger.Info("Test root context cancelled, exiting...")
// wait for everything to shut down gracefully TODO since switching to portIds by `testId`, this is no longer necessary
//time.Sleep(time.Second * 11) // 11s is needed to gracefully shutdown libp2p
time.Sleep(time.Second * 1) // 1s is needed to gracefully shutdown BadgerDB
})
}