From 2a06fd466807dae8eae8fb61643540277b3445ea Mon Sep 17 00:00:00 2001 From: tbjump Date: Wed, 19 Jul 2023 14:11:38 +0000 Subject: [PATCH] node/node_test: Test governance vaa injection --- node/pkg/node/node_test.go | 88 +++++++++++++++++++++++++++++++------- 1 file changed, 72 insertions(+), 16 deletions(-) diff --git a/node/pkg/node/node_test.go b/node/pkg/node/node_test.go index b149c37d5..370d90ef4 100644 --- a/node/pkg/node/node_test.go +++ b/node/pkg/node/node_test.go @@ -22,6 +22,7 @@ import ( "sync/atomic" + "github.com/certusone/wormhole/node/pkg/adminrpc" "github.com/certusone/wormhole/node/pkg/common" "github.com/certusone/wormhole/node/pkg/db" "github.com/certusone/wormhole/node/pkg/devnet" @@ -64,6 +65,8 @@ const WAIT_FOR_METRICS = false // The level at which logs will be written to console; During testing, logs are produced and buffered at Info level, because some tests need to look for certain entries. var CONSOLE_LOG_LEVEL = zap.InfoLevel +const guardianSetIndex = 5 // index of the active guardian set (can be anything, just needs to be set to something) + var TEST_ID_CTR atomic.Uint32 func getTestId() uint { @@ -341,9 +344,12 @@ func waitForVaa(t testing.TB, ctx context.Context, c publicrpcv1.PublicRPCServic } type testCase struct { - msg *common.MessagePublication // a Wormhole message + msg *common.MessagePublication // a Wormhole message + govMsg *nodev1.GovernanceMessage // protobuf representation of msg as governance message, if applicable. // number of Guardians who will initially observe this message through the mock watcher numGuardiansObserve int + // number of Guardians where the governance message will be injected through the adminrpc + numGuardiansInjectGov int // if true, Guardians will not observe this message in the mock watcher, if they receive a reobservation request for it unavailableInReobservation bool // if true, the test environment will inject a reobservation request signed by Guardian 1, @@ -509,6 +515,34 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } +func createGovernanceMsgAndVaa(t testing.TB) (*common.MessagePublication, *nodev1.GovernanceMessage) { + t.Helper() + msgGov := someMessage() + msgGov.EmitterAddress = vaa.GovernanceEmitter + msgGov.EmitterChain = vaa.GovernanceChain + + govMsg := &nodev1.GovernanceMessage{ + Sequence: msgGov.Sequence, + Nonce: msgGov.Nonce, + Payload: &nodev1.GovernanceMessage_GuardianSet{ + GuardianSet: &nodev1.GuardianSetUpdate{ + Guardians: []*nodev1.GuardianSetUpdate_Guardian{ + { + Pubkey: "0x187727CdD17C8142FE9b29A066F577548423aF0e", + Name: "P2P Validator", + }, + }, + }, + }, + } + govVaa, err := adminrpc.GovMsgToVaa(govMsg, guardianSetIndex, msgGov.Timestamp) + require.NoError(t, err) + msgGov.Payload = govVaa.Payload + msgGov.ConsistencyLevel = govVaa.ConsistencyLevel + + return msgGov, govMsg +} + // TestConsensus tests that a set of guardians can form consensus on certain messages and reject certain other messages func TestConsensus(t *testing.T) { // adjust processor time intervals to make tests pass faster @@ -523,6 +557,8 @@ func TestConsensus(t *testing.T) { msgGovEmitter := someMessage() msgGovEmitter.EmitterAddress = vaa.GovernanceEmitter + msgGov, msgGovProto := createGovernanceMsgAndVaa(t) + msgWrongEmitterChain := someMessage() msgWrongEmitterChain.EmitterChain = vaa.ChainIDEthereum @@ -585,6 +621,13 @@ func TestConsensus(t *testing.T) { numGuardiansObserve: numGuardians, mustReachQuorum: true, }, + { // Injected governance message + msg: msgGov, + govMsg: msgGovProto, + numGuardiansObserve: 0, + numGuardiansInjectGov: numGuardians, + mustReachQuorum: true, + }, // TODO add a testcase to test the automatic re-observation requests. // Need to refactor various usage of wall time to a mockable time first. E.g. using https://github.com/benbjohnson/clock } @@ -594,7 +637,6 @@ func TestConsensus(t *testing.T) { // runConsensusTests spins up `numGuardians` guardians and runs & verifies the testCases func runConsensusTests(t *testing.T, testCases []testCase, numGuardians int) { const testTimeout = time.Second * 30 - const guardianSetIndex = 5 // index of the active guardian set (can be anything, just needs to be set to something) const vaaCheckGuardianIndex uint = 0 // we will query this guardian's publicrpc for VAAs const adminRpcGuardianIndex uint = 0 // we will query this guardian's adminRpc testId := getTestId() @@ -681,27 +723,28 @@ func runConsensusTests(t *testing.T, testCases []testCase, numGuardians int) { } } - // Wait for adminrpc to come online - for zapObserver.FilterMessage("admin server listening on").FilterField(zap.String("path", gs[adminRpcGuardianIndex].config.adminSocket)).Len() == 0 { - logger.Info("admin server seems to be offline (according to logs). Waiting 100ms...") - time.Sleep(time.Microsecond * 100) - } - - // Send manual re-observation requests + // Do adminrpc stuff: Send manual re-observation requests and perform governance msg injections func() { // put this in own function to use defer - s := fmt.Sprintf("unix:///%s", gs[adminRpcGuardianIndex].config.adminSocket) - conn, err := grpc.DialContext(ctx, s, grpc.WithTransportCredentials(insecure.NewCredentials())) - require.NoError(t, err) - defer conn.Close() + // Wait for adminrpc to come online + adminCs := make([]nodev1.NodePrivilegedServiceClient, numGuardians) + for i := 0; i < numGuardians; i++ { + for zapObserver.FilterMessage("admin server listening on").FilterField(zap.String("path", gs[i].config.adminSocket)).Len() == 0 { + logger.Info("admin server seems to be offline (according to logs). Waiting 100ms...") + time.Sleep(time.Microsecond * 100) + } - c := nodev1.NewNodePrivilegedServiceClient(conn) + s := fmt.Sprintf("unix:///%s", gs[i].config.adminSocket) + conn, err := grpc.DialContext(ctx, s, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + defer conn.Close() + adminCs[i] = nodev1.NewNodePrivilegedServiceClient(conn) + } for i, testCase := range testCases { if testCase.performManualReobservationRequest { - // timeout for grpc query logger.Info("injecting observation request through admin rpc", zap.Int("test_case", i)) queryCtx, queryCancel := context.WithTimeout(ctx, time.Second) - _, err = c.SendObservationRequest(queryCtx, &nodev1.SendObservationRequestRequest{ + _, err := adminCs[adminRpcGuardianIndex].SendObservationRequest(queryCtx, &nodev1.SendObservationRequestRequest{ ObservationRequest: &gossipv1.ObservationRequest{ ChainId: uint32(testCase.msg.EmitterChain), TxHash: testCase.msg.TxHash[:], @@ -710,6 +753,19 @@ func runConsensusTests(t *testing.T, testCases []testCase, numGuardians int) { queryCancel() assert.NoError(t, err) } + + for j := 0; j < testCase.numGuardiansInjectGov; j++ { + require.NotNil(t, testCase.govMsg) + logger.Info("injecting message through admin rpc", zap.Int("test_case", i), zap.Int("guardian", j)) + queryCtx, queryCancel := context.WithTimeout(ctx, time.Second) + _, err := adminCs[j].InjectGovernanceVAA(queryCtx, &nodev1.InjectGovernanceVAARequest{ + CurrentSetIndex: guardianSetIndex, + Messages: []*nodev1.GovernanceMessage{testCase.govMsg}, + Timestamp: uint32(testCase.msg.Timestamp.Unix()), + }) + queryCancel() + assert.NoError(t, err) + } } }()