Node: p2p.Run interface change (#3996)

* Node: p2p.Run interface change

* Guardian should be able to set disableHeartbeatVerify
This commit is contained in:
bruce-riley 2024-07-01 10:01:13 -05:00 committed by GitHub
parent fdbfc0b085
commit d146f82bca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 566 additions and 167 deletions

View File

@ -337,9 +337,6 @@ func runSpy(cmd *cobra.Command, args []string) {
rootCtx, rootCtxCancel = context.WithCancel(context.Background())
defer rootCtxCancel()
// Outbound gossip message queue
sendC := make(chan []byte)
// Inbound signed VAAs
signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 1024)
@ -391,35 +388,21 @@ func runSpy(cmd *cobra.Command, args []string) {
supervisor.New(rootCtx, logger, func(ctx context.Context) error {
components := p2p.DefaultComponents()
components.Port = *p2pPort
params, err := p2p.NewRunParams(
*p2pBootstrap,
*p2pNetworkID,
priv,
gst,
rootCtxCancel,
p2p.WithSignedVAAListener(signedInC),
)
if err != nil {
return err
}
if err := supervisor.Run(ctx,
"p2p",
p2p.Run(nil, // Ignore incoming observations.
nil, // Ignore observation requests.
nil,
sendC,
signedInC,
priv,
nil,
gst,
*p2pNetworkID,
*p2pBootstrap,
"",
false,
rootCtxCancel,
nil,
nil,
nil,
nil,
components,
nil, // ibc feature string
false, // gateway relayer enabled
false, // ccqEnabled
nil, // query requests
nil, // query responses
"", // query bootstrap peers
0, // query port
"", // query allow list
)); err != nil {
p2p.Run(params)); err != nil {
return err
}

View File

@ -55,33 +55,39 @@ func GuardianOptionP2P(p2pKey libp2p_crypto.PrivKey, networkId, bootstrapPeers,
// Add the gossip advertisement address
components.GossipAdvertiseAddress = gossipAdvertiseAddress
g.runnables["p2p"] = p2p.Run(
g.obsvC,
g.obsvReqC.writeC,
g.obsvReqSendC.readC,
g.gossipSendC,
g.signedInC.writeC,
p2pKey,
g.gk,
g.gst,
networkId,
params, err := p2p.NewRunParams(
bootstrapPeers,
nodeName,
disableHeartbeatVerify,
networkId,
p2pKey,
g.gst,
g.rootCtxCancel,
g.acct,
g.gov,
nil,
nil,
components,
ibcFeaturesFunc,
(g.gatewayRelayer != nil),
(g.queryHandler != nil),
g.signedQueryReqC.writeC,
g.queryResponsePublicationC.readC,
ccqBootstrapPeers,
ccqPort,
ccqAllowedPeers,
p2p.WithGuardianOptions(
nodeName,
g.gk,
g.obsvC,
g.signedInC.writeC,
g.obsvReqC.writeC,
g.gossipSendC,
g.obsvReqSendC.readC,
g.acct,
g.gov,
disableHeartbeatVerify,
components,
ibcFeaturesFunc,
(g.gatewayRelayer != nil), // gatewayRelayerEnabled,
(g.queryHandler != nil), // ccqEnabled,
g.signedQueryReqC.writeC,
g.queryResponsePublicationC.readC,
ccqBootstrapPeers,
ccqPort,
ccqAllowedPeers),
)
if err != nil {
return err
}
g.runnables["p2p"] = p2p.Run(
params,
)
return nil

View File

@ -10,10 +10,7 @@ import (
"sync"
"time"
"github.com/certusone/wormhole/node/pkg/accountant"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/governor"
"github.com/certusone/wormhole/node/pkg/query"
"github.com/certusone/wormhole/node/pkg/version"
eth_common "github.com/ethereum/go-ethereum/common"
ethcrypto "github.com/ethereum/go-ethereum/crypto"
@ -293,36 +290,14 @@ func NewHost(logger *zap.Logger, ctx context.Context, networkID string, bootstra
return h, err
}
func Run(
obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation],
obsvReqC chan<- *gossipv1.ObservationRequest,
obsvReqSendC <-chan *gossipv1.ObservationRequest,
gossipSendC chan []byte,
signedInC chan<- *gossipv1.SignedVAAWithQuorum,
priv crypto.PrivKey,
gk *ecdsa.PrivateKey,
gst *common.GuardianSetState,
networkID string,
bootstrapPeers string,
nodeName string,
disableHeartbeatVerify bool,
rootCtxCancel context.CancelFunc,
acct *accountant.Accountant,
gov *governor.ChainGovernor,
signedGovCfg chan *gossipv1.SignedChainGovernorConfig,
signedGovSt chan *gossipv1.SignedChainGovernorStatus,
components *Components,
ibcFeaturesFunc func() string,
gatewayRelayerEnabled bool,
ccqEnabled bool,
signedQueryReqC chan<- *gossipv1.SignedQueryRequest,
queryResponseReadC <-chan *query.QueryResponsePublication,
ccqBootstrapPeers string,
ccqPort uint,
ccqAllowedPeers string,
) func(ctx context.Context) error {
if components == nil {
components = DefaultComponents()
func Run(params *RunParams) func(ctx context.Context) error {
if params == nil {
return func(ctx context.Context) error {
return errors.New("params may not be nil")
}
}
if params.components == nil {
params.components = DefaultComponents()
}
return func(ctx context.Context) error {
@ -336,10 +311,10 @@ func Run(
// TODO: Right now we're canceling the root context because it used to be the case that libp2p cannot be cleanly restarted.
// But that seems to no longer be the case. We may want to revisit this. See (https://github.com/libp2p/go-libp2p/issues/992) for background.
logger.Warn("p2p routine has exited, cancelling root context...")
rootCtxCancel()
params.rootCtxCancel()
}()
h, err := NewHost(logger, ctx, networkID, bootstrapPeers, components, priv)
h, err := NewHost(logger, ctx, params.networkID, params.bootstrapPeers, params.components, params.priv)
if err != nil {
panic(err)
}
@ -355,15 +330,15 @@ func Run(
panic(err)
}
topic := fmt.Sprintf("%s/%s", networkID, "broadcast")
topic := fmt.Sprintf("%s/%s", params.networkID, "broadcast")
bootstrappers, bootstrapNode := BootstrapAddrs(logger, bootstrapPeers, h.ID())
bootstrappers, bootstrapNode := BootstrapAddrs(logger, params.bootstrapPeers, h.ID())
if bootstrapNode {
logger.Info("We are a bootstrap node.")
if networkID == "/wormhole/testnet/2/1" {
components.GossipParams.Dhi = TESTNET_BOOTSTRAP_DHI
logger.Info("We are a bootstrap node in Testnet. Setting gossipParams.Dhi.", zap.Int("gossipParams.Dhi", components.GossipParams.Dhi))
if params.networkID == "/wormhole/testnet/2/1" {
params.components.GossipParams.Dhi = TESTNET_BOOTSTRAP_DHI
logger.Info("We are a bootstrap node in Testnet. Setting gossipParams.Dhi.", zap.Int("gossipParams.Dhi", params.components.GossipParams.Dhi))
}
}
@ -371,7 +346,7 @@ func Run(
ourTracer := &traceHandler{}
ps, err := pubsub.NewGossipSub(ctx, h,
pubsub.WithValidateQueueSize(P2P_VALIDATE_QUEUE_SIZE),
pubsub.WithGossipSubParams(components.GossipParams),
pubsub.WithGossipSubParams(params.components.GossipParams),
pubsub.WithEventTracer(ourTracer),
// TODO: Investigate making this change. May need to use LaxSign until everyone has upgraded to that.
// pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
@ -416,10 +391,10 @@ func Run(
bootTime := time.Now()
if ccqEnabled {
if params.ccqEnabled {
ccqErrC := make(chan error)
ccq := newCcqRunP2p(logger, ccqAllowedPeers, components)
if err := ccq.run(ctx, priv, gk, networkID, ccqBootstrapPeers, ccqPort, signedQueryReqC, queryResponseReadC, ccqErrC); err != nil {
ccq := newCcqRunP2p(logger, params.ccqAllowedPeers, params.components)
if err := ccq.run(ctx, params.priv, params.gk, params.networkID, params.ccqBootstrapPeers, params.ccqPort, params.signedQueryReqC, params.queryResponseReadC, ccqErrC); err != nil {
return fmt.Errorf("failed to start p2p for CCQ: %w", err)
}
defer ccq.close()
@ -430,7 +405,7 @@ func Run(
return
case ccqErr := <-ccqErrC:
logger.Error("ccqp2p returned an error", zap.Error(ccqErr), zap.String("component", "ccqp2p"))
rootCtxCancel()
params.rootCtxCancel()
return
}
}
@ -444,7 +419,7 @@ func Run(
for {
select {
case <-ticker.C:
gst.Cleanup()
params.gst.Cleanup()
case <-ctx.Done():
return
}
@ -453,10 +428,10 @@ func Run(
go func() {
// Disable heartbeat when no node name is provided (spy mode)
if nodeName == "" {
if params.nodeName == "" {
return
}
ourAddr := ethcrypto.PubkeyToAddress(gk.PublicKey)
ourAddr := ethcrypto.PubkeyToAddress(params.gk.PublicKey)
ctr := int64(0)
// Guardians should send out their first heartbeat immediately to speed up test runs.
@ -483,27 +458,27 @@ func Run(
}
features := make([]string, 0)
if gov != nil {
if params.gov != nil {
features = append(features, "governor")
}
if acct != nil {
features = append(features, acct.FeatureString())
if params.acct != nil {
features = append(features, params.acct.FeatureString())
}
if ibcFeaturesFunc != nil {
ibcFlags := ibcFeaturesFunc()
if params.ibcFeaturesFunc != nil {
ibcFlags := params.ibcFeaturesFunc()
if ibcFlags != "" {
features = append(features, ibcFlags)
}
}
if gatewayRelayerEnabled {
if params.gatewayRelayerEnabled {
features = append(features, "gwrelayer")
}
if ccqEnabled {
if params.ccqEnabled {
features = append(features, "ccq")
}
heartbeat := &gossipv1.Heartbeat{
NodeName: nodeName,
NodeName: params.nodeName,
Counter: ctr,
Timestamp: time.Now().UnixNano(),
Networks: networks,
@ -513,22 +488,22 @@ func Run(
Features: features,
}
if components.P2PIDInHeartbeat {
if params.components.P2PIDInHeartbeat {
heartbeat.P2PNodeId = nodeIdBytes
}
if err := gst.SetHeartbeat(ourAddr, h.ID(), heartbeat); err != nil {
if err := params.gst.SetHeartbeat(ourAddr, h.ID(), heartbeat); err != nil {
panic(err)
}
collectNodeMetrics(ourAddr, h.ID(), heartbeat)
if gov != nil {
gov.CollectMetrics(heartbeat, gossipSendC, gk, ourAddr)
if params.gov != nil {
params.gov.CollectMetrics(heartbeat, params.gossipSendC, params.gk, ourAddr)
}
msg := gossipv1.GossipMessage{
Message: &gossipv1.GossipMessage_SignedHeartbeat{
SignedHeartbeat: createSignedHeartbeat(gk, heartbeat),
SignedHeartbeat: createSignedHeartbeat(params.gk, heartbeat),
},
}
@ -555,13 +530,13 @@ func Run(
select {
case <-ctx.Done():
return
case msg := <-gossipSendC:
case msg := <-params.gossipSendC:
err := th.Publish(ctx, msg)
p2pMessagesSent.Inc()
if err != nil {
logger.Error("failed to publish message from queue", zap.Error(err))
}
case msg := <-obsvReqSendC:
case msg := <-params.obsvReqSendC:
b, err := proto.Marshal(msg)
if err != nil {
panic(err)
@ -569,7 +544,7 @@ func Run(
// Sign the observation request using our node's guardian key.
digest := signedObservationRequestDigest(b)
sig, err := ethcrypto.Sign(digest.Bytes(), gk)
sig, err := ethcrypto.Sign(digest.Bytes(), params.gk)
if err != nil {
panic(err)
}
@ -577,7 +552,7 @@ func Run(
sReq := &gossipv1.SignedObservationRequest{
ObservationRequest: b,
Signature: sig,
GuardianAddr: ethcrypto.PubkeyToAddress(gk.PublicKey).Bytes(),
GuardianAddr: ethcrypto.PubkeyToAddress(params.gk.PublicKey).Bytes(),
}
envelope := &gossipv1.GossipMessage{
@ -590,8 +565,8 @@ func Run(
}
// Send to local observation request queue (the loopback message is ignored)
if obsvReqC != nil {
obsvReqC <- msg
if params.obsvReqC != nil {
params.obsvReqC <- msg
}
err = th.Publish(ctx, b)
@ -639,17 +614,17 @@ func Run(
switch m := msg.Message.(type) {
case *gossipv1.GossipMessage_SignedHeartbeat:
s := m.SignedHeartbeat
gs := gst.Get()
gs := params.gst.Get()
if gs == nil {
// No valid guardian set yet - dropping heartbeat
logger.Log(components.SignedHeartbeatLogLevel, "skipping heartbeat - no guardian set",
logger.Log(params.components.SignedHeartbeatLogLevel, "skipping heartbeat - no guardian set",
zap.Any("value", s),
zap.String("from", envelope.GetFrom().String()))
break
}
if heartbeat, err := processSignedHeartbeat(envelope.GetFrom(), s, gs, gst, disableHeartbeatVerify); err != nil {
if heartbeat, err := processSignedHeartbeat(envelope.GetFrom(), s, gs, params.gst, params.disableHeartbeatVerify); err != nil {
p2pMessagesReceived.WithLabelValues("invalid_heartbeat").Inc()
logger.Log(components.SignedHeartbeatLogLevel, "invalid signed heartbeat received",
logger.Log(params.components.SignedHeartbeatLogLevel, "invalid signed heartbeat received",
zap.Error(err),
zap.Any("payload", msg.Message),
zap.Any("value", s),
@ -657,14 +632,14 @@ func Run(
zap.String("from", envelope.GetFrom().String()))
} else {
p2pMessagesReceived.WithLabelValues("valid_heartbeat").Inc()
logger.Log(components.SignedHeartbeatLogLevel, "valid signed heartbeat received",
logger.Log(params.components.SignedHeartbeatLogLevel, "valid signed heartbeat received",
zap.Any("value", heartbeat),
zap.String("from", envelope.GetFrom().String()))
func() {
if len(heartbeat.P2PNodeId) != 0 {
components.ProtectedHostByGuardianKeyLock.Lock()
defer components.ProtectedHostByGuardianKeyLock.Unlock()
params.components.ProtectedHostByGuardianKeyLock.Lock()
defer params.components.ProtectedHostByGuardianKeyLock.Unlock()
var peerId peer.ID
if err = peerId.Unmarshal(heartbeat.P2PNodeId); err != nil {
logger.Error("p2p_node_id_in_heartbeat_invalid",
@ -674,8 +649,8 @@ func Run(
zap.String("from", envelope.GetFrom().String()))
} else {
guardianAddr := eth_common.BytesToAddress(s.GuardianAddr)
if gk == nil || guardianAddr != ethcrypto.PubkeyToAddress(gk.PublicKey) {
prevPeerId, ok := components.ProtectedHostByGuardianKey[guardianAddr]
if params.gk == nil || guardianAddr != ethcrypto.PubkeyToAddress(params.gk.PublicKey) {
prevPeerId, ok := params.components.ProtectedHostByGuardianKey[guardianAddr]
if ok {
if prevPeerId != peerId {
logger.Info("p2p_guardian_peer_changed",
@ -683,13 +658,13 @@ func Run(
zap.String("prevPeerId", prevPeerId.String()),
zap.String("newPeerId", peerId.String()),
)
components.ConnMgr.Unprotect(prevPeerId, "heartbeat")
components.ConnMgr.Protect(peerId, "heartbeat")
components.ProtectedHostByGuardianKey[guardianAddr] = peerId
params.components.ConnMgr.Unprotect(prevPeerId, "heartbeat")
params.components.ConnMgr.Protect(peerId, "heartbeat")
params.components.ProtectedHostByGuardianKey[guardianAddr] = peerId
}
} else {
components.ConnMgr.Protect(peerId, "heartbeat")
components.ProtectedHostByGuardianKey[guardianAddr] = peerId
params.components.ConnMgr.Protect(peerId, "heartbeat")
params.components.ProtectedHostByGuardianKey[guardianAddr] = peerId
}
}
}
@ -701,23 +676,23 @@ func Run(
}()
}
case *gossipv1.GossipMessage_SignedObservation:
if obsvC != nil {
if err := common.PostMsgWithTimestamp[gossipv1.SignedObservation](m.SignedObservation, obsvC); err == nil {
if params.obsvC != nil {
if err := common.PostMsgWithTimestamp[gossipv1.SignedObservation](m.SignedObservation, params.obsvC); err == nil {
p2pMessagesReceived.WithLabelValues("observation").Inc()
} else {
if components.WarnChannelOverflow {
if params.components.WarnChannelOverflow {
logger.Warn("Ignoring SignedObservation because obsvC full", zap.String("hash", hex.EncodeToString(m.SignedObservation.Hash)))
}
p2pReceiveChannelOverflow.WithLabelValues("observation").Inc()
}
}
case *gossipv1.GossipMessage_SignedVaaWithQuorum:
if signedInC != nil {
if params.signedInC != nil {
select {
case signedInC <- m.SignedVaaWithQuorum:
case params.signedInC <- m.SignedVaaWithQuorum:
p2pMessagesReceived.WithLabelValues("signed_vaa_with_quorum").Inc()
default:
if components.WarnChannelOverflow {
if params.components.WarnChannelOverflow {
// TODO do not log this in production
var hexStr string
if vaa, err := vaa.Unmarshal(m.SignedVaaWithQuorum.Vaa); err == nil {
@ -729,9 +704,9 @@ func Run(
}
}
case *gossipv1.GossipMessage_SignedObservationRequest:
if obsvReqC != nil {
if params.obsvReqC != nil {
s := m.SignedObservationRequest
gs := gst.Get()
gs := params.gst.Get()
if gs == nil {
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("dropping SignedObservationRequest - no guardian set", zap.Any("value", s), zap.String("from", envelope.GetFrom().String()))
@ -755,7 +730,7 @@ func Run(
}
select {
case obsvReqC <- r:
case params.obsvReqC <- r:
p2pMessagesReceived.WithLabelValues("signed_observation_request").Inc()
default:
p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Inc()
@ -763,12 +738,12 @@ func Run(
}
}
case *gossipv1.GossipMessage_SignedChainGovernorConfig:
if signedGovCfg != nil {
signedGovCfg <- m.SignedChainGovernorConfig
if params.signedGovCfg != nil {
params.signedGovCfg <- m.SignedChainGovernorConfig
}
case *gossipv1.GossipMessage_SignedChainGovernorStatus:
if signedGovSt != nil {
signedGovSt <- m.SignedChainGovernorStatus
if params.signedGovSt != nil {
params.signedGovSt <- m.SignedChainGovernorStatus
}
default:
p2pMessagesReceived.WithLabelValues("unknown").Inc()

220
node/pkg/p2p/run_params.go Normal file
View File

@ -0,0 +1,220 @@
package p2p
import (
"context"
"crypto/ecdsa"
"errors"
"github.com/certusone/wormhole/node/pkg/accountant"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/governor"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/query"
"github.com/libp2p/go-libp2p/core/crypto"
)
type (
// RunParams is used to pass parameters into `p2p.Run()`. It allows applications to specify only what they need.
RunParams struct {
// These parameters are always required.
bootstrapPeers string
networkID string
priv crypto.PrivKey
gst *common.GuardianSetState
rootCtxCancel context.CancelFunc
// obsvC is optional and can be set with `WithSignedObservationListener`.
obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]
// obsvReqC is optional and can be set with `WithObservationRequestListener`.
obsvReqC chan<- *gossipv1.ObservationRequest
// signedInC is optional and can be set with `WithSignedVAAListener`.
signedInC chan<- *gossipv1.SignedVAAWithQuorum
// signedGovCfg is optional and can be set with `WithChainGovernorConfigListener`.
signedGovCfg chan *gossipv1.SignedChainGovernorConfig
// WithChainGovernorStatusListener is optional and can be set with `WithChainGovernorStatusListener`.
signedGovSt chan *gossipv1.SignedChainGovernorStatus
// disableHeartbeatVerify is optional and can be set with `WithDisableHeartbeatVerify` or `WithGuardianOptions`.
disableHeartbeatVerify bool
// The following options are guardian specific. Set with `WithGuardianOptions`.
nodeName string
gk *ecdsa.PrivateKey
gossipSendC chan []byte
obsvReqSendC <-chan *gossipv1.ObservationRequest
acct *accountant.Accountant
gov *governor.ChainGovernor
components *Components
ibcFeaturesFunc func() string
gatewayRelayerEnabled bool
ccqEnabled bool
signedQueryReqC chan<- *gossipv1.SignedQueryRequest
queryResponseReadC <-chan *query.QueryResponsePublication
ccqBootstrapPeers string
ccqPort uint
ccqAllowedPeers string
}
// RunOpt is used to specify optional parameters.
RunOpt func(p *RunParams) error
)
// NewRunParams is used to create the `RunParams` which gets passed into `p2p.Run()`. It takes the required parameters,
// plus any desired optional ones, which can be set using the various `With` functions defined below.
func NewRunParams(
bootstrapPeers string,
networkID string,
priv crypto.PrivKey,
gst *common.GuardianSetState,
rootCtxCancel context.CancelFunc,
opts ...RunOpt,
) (*RunParams, error) {
p := &RunParams{
bootstrapPeers: bootstrapPeers,
networkID: networkID,
priv: priv,
gst: gst,
rootCtxCancel: rootCtxCancel,
}
for _, opt := range opts {
err := opt(p)
if err != nil {
return nil, err
}
}
if err := p.verify(); err != nil {
return nil, err
}
return p, nil
}
// WithSignedObservationListener is used to set the channel to receive `SignedObservation“ messages.
func WithSignedObservationListener(obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]) RunOpt {
return func(p *RunParams) error {
p.obsvC = obsvC
return nil
}
}
// WithSignedVAAListener is used to set the channel to receive `SignedVAAWithQuorum messages.
func WithSignedVAAListener(signedInC chan<- *gossipv1.SignedVAAWithQuorum) RunOpt {
return func(p *RunParams) error {
p.signedInC = signedInC
return nil
}
}
// WithObservationRequestListener is used to set the channel to receive `ObservationRequest messages.
func WithObservationRequestListener(obsvReqC chan<- *gossipv1.ObservationRequest) RunOpt {
return func(p *RunParams) error {
p.obsvReqC = obsvReqC
return nil
}
}
// WithChainGovernorConfigListener is used to set the channel to receive `SignedChainGovernorConfig messages.
func WithChainGovernorConfigListener(signedGovCfg chan *gossipv1.SignedChainGovernorConfig) RunOpt {
return func(p *RunParams) error {
p.signedGovCfg = signedGovCfg
return nil
}
}
// WithChainGovernorStatusListener is used to set the channel to receive `SignedChainGovernorStatus messages.
func WithChainGovernorStatusListener(signedGovSt chan *gossipv1.SignedChainGovernorStatus) RunOpt {
return func(p *RunParams) error {
p.signedGovSt = signedGovSt
return nil
}
}
// WithDisableHeartbeatVerify is used to set disableHeartbeatVerify.
func WithDisableHeartbeatVerify(disableHeartbeatVerify bool) RunOpt {
return func(p *RunParams) error {
p.disableHeartbeatVerify = disableHeartbeatVerify
return nil
}
}
// WithGuardianOptions is used to set options that are only meaningful to the guardian.
func WithGuardianOptions(
nodeName string,
gk *ecdsa.PrivateKey,
obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation],
signedInC chan<- *gossipv1.SignedVAAWithQuorum,
obsvReqC chan<- *gossipv1.ObservationRequest,
gossipSendC chan []byte,
obsvReqSendC <-chan *gossipv1.ObservationRequest,
acct *accountant.Accountant,
gov *governor.ChainGovernor,
disableHeartbeatVerify bool,
components *Components,
ibcFeaturesFunc func() string,
gatewayRelayerEnabled bool,
ccqEnabled bool,
signedQueryReqC chan<- *gossipv1.SignedQueryRequest,
queryResponseReadC <-chan *query.QueryResponsePublication,
ccqBootstrapPeers string,
ccqPort uint,
ccqAllowedPeers string,
) RunOpt {
return func(p *RunParams) error {
p.nodeName = nodeName
p.gk = gk
p.obsvC = obsvC
p.signedInC = signedInC
p.obsvReqC = obsvReqC
p.gossipSendC = gossipSendC
p.obsvReqSendC = obsvReqSendC
p.acct = acct
p.gov = gov
p.disableHeartbeatVerify = disableHeartbeatVerify
p.components = components
p.ibcFeaturesFunc = ibcFeaturesFunc
p.gatewayRelayerEnabled = gatewayRelayerEnabled
p.ccqEnabled = ccqEnabled
p.signedQueryReqC = signedQueryReqC
p.queryResponseReadC = queryResponseReadC
p.ccqBootstrapPeers = ccqBootstrapPeers
p.ccqPort = ccqPort
p.ccqAllowedPeers = ccqAllowedPeers
return nil
}
}
// verify is used to verify the RunParams object.
func (p *RunParams) verify() error {
if p.bootstrapPeers == "" {
return errors.New("bootstrapPeers may not be nil")
}
if p.networkID == "" {
return errors.New("networkID may not be nil")
}
if p.priv == nil {
return errors.New("priv may not be nil")
}
if p.gst == nil {
return errors.New("gst may not be nil")
}
if p.rootCtxCancel == nil {
return errors.New("rootCtxCancel may not be nil")
}
if p.nodeName != "" { // Heartbeating is enabled.
if p.gk == nil {
return errors.New("if heart beating is enabled, gk may not be nil")
}
}
if p.obsvReqSendC != nil {
if p.gk == nil {
return errors.New("if obsvReqSendC is not nil, gk may not be nil")
}
}
return nil
}

View File

@ -0,0 +1,210 @@
package p2p
import (
"context"
"crypto/ecdsa"
"crypto/rand"
"testing"
"github.com/certusone/wormhole/node/pkg/accountant"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/governor"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/query"
"github.com/ethereum/go-ethereum/crypto"
p2pcrypto "github.com/libp2p/go-libp2p/core/crypto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const bootstrapPeers = "/dns4/guardian-0.guardian/udp/8999/quic/p2p/12D3KooWHHzSeKaY8xuZVzkLbKFfvNgPPeKhFBGrMbNzbm5akpqu"
const networkId = "/wormhole/dev"
const nodeName = "guardian-0"
func TestRunParamsBootstrapPeersRequired(t *testing.T) {
priv, _, err := p2pcrypto.GenerateKeyPair(p2pcrypto.Ed25519, -1)
require.NoError(t, err)
gst := common.NewGuardianSetState(nil)
_, rootCtxCancel := context.WithCancel(context.Background())
defer rootCtxCancel()
params, err := NewRunParams(
"", // bootstrapPeers,
networkId,
priv,
gst,
rootCtxCancel,
)
require.ErrorContains(t, err, "bootstrapPeers may not be nil")
require.Nil(t, params)
}
func TestRunParamsNetworkIdRequired(t *testing.T) {
priv, _, err := p2pcrypto.GenerateKeyPair(p2pcrypto.Ed25519, -1)
require.NoError(t, err)
gst := common.NewGuardianSetState(nil)
_, rootCtxCancel := context.WithCancel(context.Background())
defer rootCtxCancel()
params, err := NewRunParams(
bootstrapPeers,
"", // networkId,
priv,
gst,
rootCtxCancel,
)
require.ErrorContains(t, err, "networkID may not be nil")
require.Nil(t, params)
}
func TestRunParamsPrivRequired(t *testing.T) {
gst := common.NewGuardianSetState(nil)
_, rootCtxCancel := context.WithCancel(context.Background())
defer rootCtxCancel()
params, err := NewRunParams(
bootstrapPeers,
networkId,
nil, // priv,
gst,
rootCtxCancel,
)
require.ErrorContains(t, err, "priv may not be nil")
require.Nil(t, params)
}
func TestRunParamsGstRequired(t *testing.T) {
priv, _, err := p2pcrypto.GenerateKeyPair(p2pcrypto.Ed25519, -1)
require.NoError(t, err)
_, rootCtxCancel := context.WithCancel(context.Background())
defer rootCtxCancel()
params, err := NewRunParams(
bootstrapPeers,
networkId,
priv,
nil, // gst,
rootCtxCancel,
)
require.ErrorContains(t, err, "gst may not be nil")
require.Nil(t, params)
}
func TestRunParamsRootCtxCancelRequired(t *testing.T) {
priv, _, err := p2pcrypto.GenerateKeyPair(p2pcrypto.Ed25519, -1)
require.NoError(t, err)
gst := common.NewGuardianSetState(nil)
_, rootCtxCancel := context.WithCancel(context.Background())
defer rootCtxCancel()
params, err := NewRunParams(
bootstrapPeers,
networkId,
priv,
gst,
nil, // rootCtxCancel,
)
require.ErrorContains(t, err, "rootCtxCancel may not be nil")
require.Nil(t, params)
}
func TestRunParamsWithDisableHeartbeatVerify(t *testing.T) {
priv, _, err := p2pcrypto.GenerateKeyPair(p2pcrypto.Ed25519, -1)
require.NoError(t, err)
gst := common.NewGuardianSetState(nil)
_, rootCtxCancel := context.WithCancel(context.Background())
defer rootCtxCancel()
params, err := NewRunParams(
bootstrapPeers,
networkId,
priv,
gst,
rootCtxCancel,
WithDisableHeartbeatVerify(true),
)
require.NoError(t, err)
require.NotNil(t, params)
assert.True(t, params.disableHeartbeatVerify)
}
func TestRunParamsWithGuardianOptions(t *testing.T) {
priv, _, err := p2pcrypto.GenerateKeyPair(p2pcrypto.Ed25519, -1)
require.NoError(t, err)
gst := common.NewGuardianSetState(nil)
_, rootCtxCancel := context.WithCancel(context.Background())
defer rootCtxCancel()
gk, err := ecdsa.GenerateKey(crypto.S256(), rand.Reader)
require.NoError(t, err)
require.NotNil(t, gk)
obsvC := make(chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation], 42)
signedInC := make(chan<- *gossipv1.SignedVAAWithQuorum, 42)
obsvReqC := make(chan<- *gossipv1.ObservationRequest, 42)
gossipSendC := make(chan []byte, 42)
obsvReqSendC := make(<-chan *gossipv1.ObservationRequest, 42)
acct := &accountant.Accountant{}
gov := &governor.ChainGovernor{}
disableHeartbeatVerify := false
components := &Components{}
ibcFeaturesFunc := func() string { return "Hello, World!" }
gatewayRelayerEnabled := true
ccqEnabled := true
signedQueryReqC := make(chan<- *gossipv1.SignedQueryRequest, 42)
queryResponseReadC := make(<-chan *query.QueryResponsePublication, 42)
ccqBootstrapPeers := "some bootstrap string"
ccqPort := uint(4242)
ccqAllowedPeers := "some allowed peers"
params, err := NewRunParams(
bootstrapPeers,
networkId,
priv,
gst,
rootCtxCancel,
WithGuardianOptions(
nodeName,
gk,
obsvC,
signedInC,
obsvReqC,
gossipSendC,
obsvReqSendC,
acct,
gov,
disableHeartbeatVerify,
components,
ibcFeaturesFunc,
gatewayRelayerEnabled,
ccqEnabled,
signedQueryReqC,
queryResponseReadC,
ccqBootstrapPeers,
ccqPort,
ccqAllowedPeers),
)
require.NoError(t, err)
require.NotNil(t, params)
assert.Equal(t, nodeName, params.nodeName)
assert.Equal(t, obsvC, params.obsvC)
assert.Equal(t, signedInC, params.signedInC)
assert.Equal(t, obsvReqC, params.obsvReqC)
assert.Equal(t, gossipSendC, params.gossipSendC)
assert.Equal(t, obsvReqSendC, params.obsvReqSendC)
assert.Equal(t, acct, params.acct)
assert.Equal(t, gov, params.gov)
assert.Equal(t, components, params.components)
assert.NotNil(t, params.ibcFeaturesFunc) // Can't compare function pointers, so just verify it's set.
assert.True(t, params.gatewayRelayerEnabled)
assert.True(t, params.ccqEnabled)
assert.Equal(t, signedQueryReqC, params.signedQueryReqC)
assert.Equal(t, queryResponseReadC, params.queryResponseReadC)
assert.Equal(t, ccqBootstrapPeers, params.ccqBootstrapPeers)
assert.Equal(t, ccqPort, params.ccqPort)
assert.Equal(t, ccqAllowedPeers, params.ccqAllowedPeers)
}

View File

@ -61,6 +61,8 @@ func NewG(t *testing.T, nodeName string) *G {
panic(err)
}
_, rootCtxCancel := context.WithCancel(context.Background())
g := &G{
obsvC: make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], cs),
obsvReqC: make(chan *gossipv1.ObservationRequest, cs),
@ -72,7 +74,7 @@ func NewG(t *testing.T, nodeName string) *G {
gst: node_common.NewGuardianSetState(nil),
nodeName: nodeName,
disableHeartbeatVerify: false,
rootCtxCancel: nil,
rootCtxCancel: rootCtxCancel,
gov: nil,
signedGovCfg: make(chan *gossipv1.SignedChainGovernorConfig, cs),
signedGovSt: make(chan *gossipv1.SignedChainGovernorStatus, cs),
@ -164,32 +166,35 @@ func TestWatermark(t *testing.T) {
func startGuardian(t *testing.T, ctx context.Context, g *G) {
t.Helper()
supervisor.New(ctx, zap.L(),
Run(g.obsvC,
g.obsvReqC,
g.obsvReqSendC,
g.sendC,
g.signedInC,
g.priv,
g.gk,
g.gst,
g.networkID,
g.bootstrapPeers,
params, err := NewRunParams(
g.bootstrapPeers,
g.networkID,
g.priv,
g.gst,
g.rootCtxCancel,
WithGuardianOptions(
g.nodeName,
g.disableHeartbeatVerify,
g.rootCtxCancel,
g.gk,
g.obsvC,
g.signedInC,
g.obsvReqC,
g.sendC,
g.obsvReqSendC,
g.acct,
g.gov,
g.signedGovCfg,
g.signedGovSt,
g.disableHeartbeatVerify,
g.components,
nil, // ibc feature string
nil, //g.ibcFeaturesFunc,
false, // gateway relayer enabled
false, // ccqEnabled
nil, // signed query request channel
nil, // query response channel
"", // query bootstrap peers
0, // query port
"", // query allowed peers
"", // query allowed peers),
))
require.NoError(t, err)
supervisor.New(ctx, zap.L(),
Run(params))
}