node/node_test: Test governance vaa injection
This commit is contained in:
parent
1cabbe8a2d
commit
2a06fd4668
|
@ -22,6 +22,7 @@ import (
|
|||
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/certusone/wormhole/node/pkg/adminrpc"
|
||||
"github.com/certusone/wormhole/node/pkg/common"
|
||||
"github.com/certusone/wormhole/node/pkg/db"
|
||||
"github.com/certusone/wormhole/node/pkg/devnet"
|
||||
|
@ -64,6 +65,8 @@ const WAIT_FOR_METRICS = false
|
|||
// The level at which logs will be written to console; During testing, logs are produced and buffered at Info level, because some tests need to look for certain entries.
|
||||
var CONSOLE_LOG_LEVEL = zap.InfoLevel
|
||||
|
||||
const guardianSetIndex = 5 // index of the active guardian set (can be anything, just needs to be set to something)
|
||||
|
||||
var TEST_ID_CTR atomic.Uint32
|
||||
|
||||
func getTestId() uint {
|
||||
|
@ -342,8 +345,11 @@ func waitForVaa(t testing.TB, ctx context.Context, c publicrpcv1.PublicRPCServic
|
|||
|
||||
type testCase struct {
|
||||
msg *common.MessagePublication // a Wormhole message
|
||||
govMsg *nodev1.GovernanceMessage // protobuf representation of msg as governance message, if applicable.
|
||||
// number of Guardians who will initially observe this message through the mock watcher
|
||||
numGuardiansObserve int
|
||||
// number of Guardians where the governance message will be injected through the adminrpc
|
||||
numGuardiansInjectGov int
|
||||
// if true, Guardians will not observe this message in the mock watcher, if they receive a reobservation request for it
|
||||
unavailableInReobservation bool
|
||||
// if true, the test environment will inject a reobservation request signed by Guardian 1,
|
||||
|
@ -509,6 +515,34 @@ func TestMain(m *testing.M) {
|
|||
os.Exit(m.Run())
|
||||
}
|
||||
|
||||
func createGovernanceMsgAndVaa(t testing.TB) (*common.MessagePublication, *nodev1.GovernanceMessage) {
|
||||
t.Helper()
|
||||
msgGov := someMessage()
|
||||
msgGov.EmitterAddress = vaa.GovernanceEmitter
|
||||
msgGov.EmitterChain = vaa.GovernanceChain
|
||||
|
||||
govMsg := &nodev1.GovernanceMessage{
|
||||
Sequence: msgGov.Sequence,
|
||||
Nonce: msgGov.Nonce,
|
||||
Payload: &nodev1.GovernanceMessage_GuardianSet{
|
||||
GuardianSet: &nodev1.GuardianSetUpdate{
|
||||
Guardians: []*nodev1.GuardianSetUpdate_Guardian{
|
||||
{
|
||||
Pubkey: "0x187727CdD17C8142FE9b29A066F577548423aF0e",
|
||||
Name: "P2P Validator",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
govVaa, err := adminrpc.GovMsgToVaa(govMsg, guardianSetIndex, msgGov.Timestamp)
|
||||
require.NoError(t, err)
|
||||
msgGov.Payload = govVaa.Payload
|
||||
msgGov.ConsistencyLevel = govVaa.ConsistencyLevel
|
||||
|
||||
return msgGov, govMsg
|
||||
}
|
||||
|
||||
// TestConsensus tests that a set of guardians can form consensus on certain messages and reject certain other messages
|
||||
func TestConsensus(t *testing.T) {
|
||||
// adjust processor time intervals to make tests pass faster
|
||||
|
@ -523,6 +557,8 @@ func TestConsensus(t *testing.T) {
|
|||
msgGovEmitter := someMessage()
|
||||
msgGovEmitter.EmitterAddress = vaa.GovernanceEmitter
|
||||
|
||||
msgGov, msgGovProto := createGovernanceMsgAndVaa(t)
|
||||
|
||||
msgWrongEmitterChain := someMessage()
|
||||
msgWrongEmitterChain.EmitterChain = vaa.ChainIDEthereum
|
||||
|
||||
|
@ -585,6 +621,13 @@ func TestConsensus(t *testing.T) {
|
|||
numGuardiansObserve: numGuardians,
|
||||
mustReachQuorum: true,
|
||||
},
|
||||
{ // Injected governance message
|
||||
msg: msgGov,
|
||||
govMsg: msgGovProto,
|
||||
numGuardiansObserve: 0,
|
||||
numGuardiansInjectGov: numGuardians,
|
||||
mustReachQuorum: true,
|
||||
},
|
||||
// TODO add a testcase to test the automatic re-observation requests.
|
||||
// Need to refactor various usage of wall time to a mockable time first. E.g. using https://github.com/benbjohnson/clock
|
||||
}
|
||||
|
@ -594,7 +637,6 @@ func TestConsensus(t *testing.T) {
|
|||
// runConsensusTests spins up `numGuardians` guardians and runs & verifies the testCases
|
||||
func runConsensusTests(t *testing.T, testCases []testCase, numGuardians int) {
|
||||
const testTimeout = time.Second * 30
|
||||
const guardianSetIndex = 5 // index of the active guardian set (can be anything, just needs to be set to something)
|
||||
const vaaCheckGuardianIndex uint = 0 // we will query this guardian's publicrpc for VAAs
|
||||
const adminRpcGuardianIndex uint = 0 // we will query this guardian's adminRpc
|
||||
testId := getTestId()
|
||||
|
@ -681,27 +723,28 @@ func runConsensusTests(t *testing.T, testCases []testCase, numGuardians int) {
|
|||
}
|
||||
}
|
||||
|
||||
// Do adminrpc stuff: Send manual re-observation requests and perform governance msg injections
|
||||
func() { // put this in own function to use defer
|
||||
// Wait for adminrpc to come online
|
||||
for zapObserver.FilterMessage("admin server listening on").FilterField(zap.String("path", gs[adminRpcGuardianIndex].config.adminSocket)).Len() == 0 {
|
||||
adminCs := make([]nodev1.NodePrivilegedServiceClient, numGuardians)
|
||||
for i := 0; i < numGuardians; i++ {
|
||||
for zapObserver.FilterMessage("admin server listening on").FilterField(zap.String("path", gs[i].config.adminSocket)).Len() == 0 {
|
||||
logger.Info("admin server seems to be offline (according to logs). Waiting 100ms...")
|
||||
time.Sleep(time.Microsecond * 100)
|
||||
}
|
||||
|
||||
// Send manual re-observation requests
|
||||
func() { // put this in own function to use defer
|
||||
s := fmt.Sprintf("unix:///%s", gs[adminRpcGuardianIndex].config.adminSocket)
|
||||
s := fmt.Sprintf("unix:///%s", gs[i].config.adminSocket)
|
||||
conn, err := grpc.DialContext(ctx, s, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
require.NoError(t, err)
|
||||
defer conn.Close()
|
||||
|
||||
c := nodev1.NewNodePrivilegedServiceClient(conn)
|
||||
adminCs[i] = nodev1.NewNodePrivilegedServiceClient(conn)
|
||||
}
|
||||
|
||||
for i, testCase := range testCases {
|
||||
if testCase.performManualReobservationRequest {
|
||||
// timeout for grpc query
|
||||
logger.Info("injecting observation request through admin rpc", zap.Int("test_case", i))
|
||||
queryCtx, queryCancel := context.WithTimeout(ctx, time.Second)
|
||||
_, err = c.SendObservationRequest(queryCtx, &nodev1.SendObservationRequestRequest{
|
||||
_, err := adminCs[adminRpcGuardianIndex].SendObservationRequest(queryCtx, &nodev1.SendObservationRequestRequest{
|
||||
ObservationRequest: &gossipv1.ObservationRequest{
|
||||
ChainId: uint32(testCase.msg.EmitterChain),
|
||||
TxHash: testCase.msg.TxHash[:],
|
||||
|
@ -710,6 +753,19 @@ func runConsensusTests(t *testing.T, testCases []testCase, numGuardians int) {
|
|||
queryCancel()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
for j := 0; j < testCase.numGuardiansInjectGov; j++ {
|
||||
require.NotNil(t, testCase.govMsg)
|
||||
logger.Info("injecting message through admin rpc", zap.Int("test_case", i), zap.Int("guardian", j))
|
||||
queryCtx, queryCancel := context.WithTimeout(ctx, time.Second)
|
||||
_, err := adminCs[j].InjectGovernanceVAA(queryCtx, &nodev1.InjectGovernanceVAARequest{
|
||||
CurrentSetIndex: guardianSetIndex,
|
||||
Messages: []*nodev1.GovernanceMessage{testCase.govMsg},
|
||||
Timestamp: uint32(testCase.msg.Timestamp.Unix()),
|
||||
})
|
||||
queryCancel()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
Loading…
Reference in New Issue