node: refactor guardiand/node.go

This commit is contained in:
tbjump 2023-05-24 21:23:42 +00:00 committed by tbjump
parent a1ce981b49
commit 149d898ac6
30 changed files with 2063 additions and 686 deletions

View File

@ -325,7 +325,7 @@ jobs:
go-version: "1.20.5"
# The go-ethereum and celo-blockchain packages both implement secp256k1 using the exact same header, but that causes duplicate symbols.
- name: Run golang tests
run: cd node && go test -v -timeout 1m -race -ldflags '-extldflags "-Wl,--allow-multiple-definition" ' ./...
run: cd node && go test -v -timeout 5m -race -ldflags '-extldflags "-Wl,--allow-multiple-definition" ' ./...
# Run Rust lints and tests
rust-lint-and-tests:

File diff suppressed because it is too large Load Diff

View File

@ -12,7 +12,7 @@ require (
github.com/ethereum/go-ethereum v1.10.21
github.com/gagliardetto/solana-go v1.7.1
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0
github.com/gorilla/websocket v1.5.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.5.0

View File

@ -9,6 +9,7 @@ import (
"time"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/common"
)
@ -155,3 +156,20 @@ func (msg *MessagePublication) CreateDigest() string {
db := v.SigningDigest()
return hex.EncodeToString(db.Bytes())
}
// ZapFields takes some zap fields and appends zap fields related to the message. Example usage:
// `logger.Info("logging something with a message", msg.ZapFields(zap.Int("some_other_field", 100))...)“
// TODO refactor the codebase to use this function instead of manually logging the message with inconsistent fields
func (msg *MessagePublication) ZapFields(fields ...zap.Field) []zap.Field {
return append(fields,
zap.Stringer("tx", msg.TxHash),
zap.Time("timestamp", msg.Timestamp),
zap.Uint32("nonce", msg.Nonce),
zap.Uint64("seq", msg.Sequence),
zap.Uint8("consistency", msg.ConsistencyLevel),
zap.Stringer("emitter_chain", msg.EmitterChain),
zap.Stringer("emitter_address", msg.EmitterAddress),
zap.String("message_id", string(msg.MessageID())),
zap.Bool("unreliable", msg.Unreliable),
)
}

View File

@ -3,6 +3,7 @@ package common
import (
"context"
"fmt"
"sync"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
@ -18,6 +19,10 @@ import (
type GrpcLogDetail string
// initMutex is used during initialization to prevent concurrent writes to the prometheus registry.
// This is only relevant during testing of node/node.go where multiple guardians are created in the same process.
var initMutex sync.Mutex
const (
GrpcLogDetailNone GrpcLogDetail = "none"
GrpcLogDetailMinimal GrpcLogDetail = "minimal"
@ -86,6 +91,9 @@ func requestPayloadServerInterceptor(ctx context.Context, req interface{}, info
}
func NewInstrumentedGRPCServer(logger *zap.Logger, rpcLogDetail GrpcLogDetail) *grpc.Server {
initMutex.Lock()
defer initMutex.Unlock()
streamInterceptors := []grpc.StreamServerInterceptor{
grpc_ctxtags.StreamServerInterceptor(),
grpc_prometheus.StreamServerInterceptor,

View File

@ -13,7 +13,7 @@ const (
)
// MustRegisterReadinessSyncing registers the specified chain for readiness syncing. It panics if the chain ID is invalid so it should only be used during initialization.
// This function will
// TODO: Using vaa.ChainID is bad here because there can be multiple watchers for the same chainId, e.g. solana-finalized and solana-confirmed. This is currently handled as a special case for solana in node/node.go, but should really be fixed here.
func MustRegisterReadinessSyncing(chainID vaa.ChainID) {
readiness.RegisterComponent(MustConvertChainIdToReadinessSyncing(chainID))
}

21
node/pkg/db/open.go Normal file
View File

@ -0,0 +1,21 @@
package db
import (
"os"
"path"
"go.uber.org/zap"
)
func OpenDb(logger *zap.Logger, dataDir *string) *Database {
dbPath := path.Join(*dataDir, "db")
if err := os.MkdirAll(dbPath, 0700); err != nil {
logger.Fatal("failed to create database directory", zap.Error(err))
}
db, err := Open(dbPath)
if err != nil {
logger.Fatal("failed to open database", zap.Error(err))
}
return db
}

View File

@ -25,7 +25,7 @@ import (
ethcrypto "github.com/ethereum/go-ethereum/crypto"
)
func AdminServiceRunnable(
func adminServiceRunnable(
logger *zap.Logger,
socketPath string,
injectC chan<- *vaa.VAA,

589
node/pkg/node/node.go Normal file
View File

@ -0,0 +1,589 @@
package node
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"net/http"
"github.com/benbjohnson/clock"
"github.com/certusone/wormhole/node/pkg/accountant"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/db"
"github.com/certusone/wormhole/node/pkg/governor"
"github.com/certusone/wormhole/node/pkg/p2p"
"github.com/certusone/wormhole/node/pkg/processor"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/readiness"
"github.com/certusone/wormhole/node/pkg/reporter"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/certusone/wormhole/node/pkg/watchers"
"github.com/certusone/wormhole/node/pkg/watchers/ibc"
"github.com/certusone/wormhole/node/pkg/watchers/interfaces"
"github.com/certusone/wormhole/node/pkg/wormconn"
"github.com/gorilla/mux"
libp2p_crypto "github.com/libp2p/go-libp2p/core/crypto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
)
const (
inboundObservationBufferSize = 50
inboundSignedVaaBufferSize = 50
observationRequestOutboundBufferSize = 50
observationRequestInboundBufferSize = 50
// observationRequestBufferSize is the buffer size of the per-network reobservation channel
observationRequestBufferSize = 25
)
type PrometheusCtxKey struct{}
type GuardianOption struct {
name string
dependencies []string // Array of other option's `name`. These options need to be configured before this option. Dependencies are enforced at runtime.
f func(context.Context, *zap.Logger, *G) error // Function that is run by the constructor to initialize this component.
}
// GuardianOptionP2P configures p2p networking.
// Dependencies: Accountant, Governor
func GuardianOptionP2P(p2pKey libp2p_crypto.PrivKey, networkId string, bootstrapPeers string, nodeName string, disableHeartbeatVerify bool, port uint, ibcFeaturesFunc func() string) *GuardianOption {
return &GuardianOption{
name: "p2p",
dependencies: []string{"accountant", "governor"},
f: func(ctx context.Context, logger *zap.Logger, g *G) error {
components := p2p.DefaultComponents()
components.Port = port
g.runnables["p2p"] = p2p.Run(
g.obsvC,
g.obsvReqC.writeC,
g.obsvReqSendC.readC,
g.gossipSendC,
g.signedInC.writeC,
p2pKey,
g.gk,
g.gst,
networkId,
bootstrapPeers,
nodeName,
disableHeartbeatVerify,
g.rootCtxCancel,
g.acct,
g.gov,
nil,
nil,
components,
ibcFeaturesFunc,
)
return nil
}}
}
// GuardianOptionAccountant configures the Accountant module.
// Requires: wormchainConn
func GuardianOptionAccountant(contract string, websocket string, enforcing bool) *GuardianOption {
return &GuardianOption{
name: "accountant",
f: func(ctx context.Context, logger *zap.Logger, g *G) error {
// Set up the accountant. If the accountant smart contract is configured, we will instantiate the accountant and VAAs
// will be passed to it for processing. It will forward all token bridge transfers to the accountant contract.
// If accountantCheckEnabled is set to true, token bridge transfers will not be signed and published until they
// are approved by the accountant smart contract.
if contract == "" {
logger.Info("acct: accountant is disabled", zap.String("component", "gacct"))
return nil
}
if websocket == "" {
return errors.New("acct: if accountantContract is specified, accountantWS is required")
}
if g.wormchainConn == nil {
return errors.New("acct: if accountantContract is specified, the wormchain sending connection must be enabled before.")
}
if enforcing {
logger.Info("acct: accountant is enabled and will be enforced", zap.String("component", "gacct"))
} else {
logger.Info("acct: accountant is enabled but will not be enforced", zap.String("component", "gacct"))
}
g.acct = accountant.NewAccountant(
ctx,
logger,
g.db,
g.obsvReqC.writeC,
contract,
websocket,
g.wormchainConn,
enforcing,
g.gk,
g.gst,
g.acctC.writeC,
g.env,
)
return nil
}}
}
func GuardianOptionGovernor(governorEnabled bool) *GuardianOption {
return &GuardianOption{
name: "governor",
f: func(ctx context.Context, logger *zap.Logger, g *G) error {
if governorEnabled {
logger.Info("chain governor is enabled")
g.gov = governor.NewChainGovernor(logger, g.db, g.env)
} else {
logger.Info("chain governor is disabled")
}
return nil
}}
}
func GuardianOptionStatusServer(statusAddr string) *GuardianOption {
return &GuardianOption{
name: "status-server",
f: func(ctx context.Context, logger *zap.Logger, g *G) error {
if statusAddr != "" {
// Use a custom routing instead of using http.DefaultServeMux directly to avoid accidentally exposing packages
// that register themselves with it by default (like pprof).
router := mux.NewRouter()
// pprof server. NOT necessarily safe to expose publicly - only enable it in dev mode to avoid exposing it by
// accident. There's benefit to having pprof enabled on production nodes, but we would likely want to expose it
// via a dedicated port listening on localhost, or via the admin UNIX socket.
if g.env == common.UnsafeDevNet {
// Pass requests to http.DefaultServeMux, which pprof automatically registers with as an import side-effect.
router.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux)
}
// Simple endpoint exposing node readiness (safe to expose to untrusted clients)
router.HandleFunc("/readyz", readiness.Handler)
// Prometheus metrics (safe to expose to untrusted clients)
router.Handle("/metrics", promhttp.Handler())
go func() {
logger.Info("status server listening", zap.String("status_addr", statusAddr))
// SECURITY: If making changes, ensure that we always do `router := mux.NewRouter()` before this to avoid accidentally exposing pprof
logger.Error("status server crashed", zap.Error(http.ListenAndServe(statusAddr, router))) // #nosec G114 local status server not vulnerable to DoS attack
}()
}
return nil
}}
}
type IbcWatcherConfig struct {
Websocket string
Lcd string
Contract string
}
func GuardianOptionWatchers(watcherConfigs []watchers.WatcherConfig, ibcWatcherConfig *IbcWatcherConfig) *GuardianOption {
return &GuardianOption{
name: "watchers",
f: func(ctx context.Context, logger *zap.Logger, g *G) error {
chainObsvReqC := make(map[vaa.ChainID]chan *gossipv1.ObservationRequest)
chainMsgC := make(map[vaa.ChainID]chan *common.MessagePublication)
for _, chainId := range vaa.GetAllNetworkIDs() {
chainMsgC[chainId] = make(chan *common.MessagePublication)
go func(c <-chan *common.MessagePublication, chainId vaa.ChainID) {
zeroAddress := vaa.Address{}
for {
select {
case <-ctx.Done():
return
case msg := <-c:
if msg.EmitterChain != chainId {
level := zapcore.FatalLevel
if g.env == common.GoTest {
// If we're in gotest, we don't want to os.Exit() here because that's hard to catch.
// Since continuing execution here doesn't have any side effects here, it's fine to have a
// differing behavior in GoTest mode.
level = zapcore.ErrorLevel
}
logger.Log(level, "SECURITY CRITICAL: Received observation from a chain that was not marked as originating from that chain",
zap.Stringer("tx", msg.TxHash),
zap.Stringer("emitter_address", msg.EmitterAddress),
zap.Uint64("sequence", msg.Sequence),
zap.Stringer("msgChainId", msg.EmitterChain),
zap.Stringer("watcherChainId", chainId),
)
} else if msg.EmitterAddress == zeroAddress {
level := zapcore.FatalLevel
if g.env == common.GoTest {
// If we're in gotest, we don't want to os.Exit() here because that's hard to catch.
// Since continuing execution here doesn't have any side effects here, it's fine to have a
// differing behavior in GoTest mode.
level = zapcore.ErrorLevel
}
logger.Log(level, "SECURITY ERROR: Received observation with EmitterAddress == 0x00",
zap.Stringer("tx", msg.TxHash),
zap.Stringer("emitter_address", msg.EmitterAddress),
zap.Uint64("sequence", msg.Sequence),
zap.Stringer("msgChainId", msg.EmitterChain),
zap.Stringer("watcherChainId", chainId),
)
} else {
g.msgC.writeC <- msg
}
}
}
}(chainMsgC[chainId], chainId)
}
watchers := make(map[watchers.NetworkID]interfaces.L1Finalizer)
for _, wc := range watcherConfigs {
if _, ok := watchers[wc.GetNetworkID()]; ok {
logger.Fatal("NetworkID already configured", zap.String("network_id", string(wc.GetNetworkID())))
}
watcherName := string(wc.GetNetworkID()) + "watch"
logger.Debug("Setting up watcher: " + watcherName)
if wc.GetNetworkID() != "solana-confirmed" { // TODO this should not be a special case, see comment in common/readiness.go
common.MustRegisterReadinessSyncing(wc.GetChainID())
}
chainObsvReqC[wc.GetChainID()] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if wc.RequiredL1Finalizer() != "" {
l1watcher, ok := watchers[wc.RequiredL1Finalizer()]
if !ok || l1watcher == nil {
logger.Fatal("L1finalizer does not exist. Please check the order of the watcher configurations in watcherConfigs. The L1 must be configured before this one.",
zap.String("ChainID", wc.GetChainID().String()),
zap.String("L1ChainID", string(wc.RequiredL1Finalizer())))
}
wc.SetL1Finalizer(l1watcher)
}
l1finalizer, runnable, err := wc.Create(chainMsgC[wc.GetChainID()], chainObsvReqC[wc.GetChainID()], g.setC.writeC, g.env)
if err != nil {
logger.Fatal("error creating watcher", zap.Error(err))
}
g.runnablesWithScissors[watcherName] = runnable
watchers[wc.GetNetworkID()] = l1finalizer
}
if ibcWatcherConfig != nil {
var chainConfig ibc.ChainConfig
for _, chainID := range ibc.Chains {
if _, exists := chainMsgC[chainID]; !exists {
return errors.New("invalid IBC chain ID")
}
if _, exists := chainObsvReqC[chainID]; exists {
logger.Warn("not monitoring chain with IBC because it is already registered.", zap.Stringer("chainID", chainID))
continue
}
chainObsvReqC[chainID] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
common.MustRegisterReadinessSyncing(chainID)
chainConfig = append(chainConfig, ibc.ChainConfigEntry{
ChainID: chainID,
MsgC: chainMsgC[chainID],
ObsvReqC: chainObsvReqC[chainID],
})
}
if len(chainConfig) > 0 {
logger.Info("Starting IBC watcher")
readiness.RegisterComponent(common.ReadinessIBCSyncing)
g.runnablesWithScissors["ibcwatch"] = ibc.NewWatcher(ibcWatcherConfig.Websocket, ibcWatcherConfig.Lcd, ibcWatcherConfig.Contract, chainConfig).Run
} else {
logger.Error("Although IBC is enabled, there are no chains for it to monitor")
}
}
go handleReobservationRequests(ctx, clock.New(), logger, g.obsvReqC.readC, chainObsvReqC)
return nil
}}
}
func GuardianOptionAdminService(socketPath string, ethRpc *string, ethContract *string, rpcMap map[string]string) *GuardianOption {
return &GuardianOption{
name: "admin-service",
dependencies: []string{"governor"},
f: func(ctx context.Context, logger *zap.Logger, g *G) error {
adminService, err := adminServiceRunnable(
logger,
socketPath,
g.injectC.writeC,
g.signedInC.writeC,
g.obsvReqSendC.writeC,
g.db,
g.gst,
g.gov,
g.gk,
ethRpc,
ethContract,
rpcMap,
)
if err != nil {
logger.Fatal("failed to create admin service socket", zap.Error(err))
}
g.runnables["admin"] = adminService
return nil
}}
}
func GuardianOptionPublicRpcSocket(publicGRPCSocketPath string, publicRpcLogDetail common.GrpcLogDetail) *GuardianOption {
return &GuardianOption{
name: "publicrpcsocket",
dependencies: []string{"governor"},
f: func(ctx context.Context, logger *zap.Logger, g *G) error {
// local public grpc service socket
publicrpcUnixService, publicrpcServer, err := publicrpcUnixServiceRunnable(logger, publicGRPCSocketPath, publicRpcLogDetail, g.db, g.gst, g.gov)
if err != nil {
logger.Fatal("failed to create publicrpc service socket", zap.Error(err))
}
g.runnables["publicrpcsocket"] = publicrpcUnixService
g.publicrpcServer = publicrpcServer
return nil
}}
}
func GuardianOptionPublicrpcTcpService(publicRpc string, publicRpcLogDetail common.GrpcLogDetail) *GuardianOption {
return &GuardianOption{
name: "publicrpc",
dependencies: []string{"governor", "publicrpcsocket"},
f: func(ctx context.Context, logger *zap.Logger, g *G) error {
publicrpcService, err := publicrpcTcpServiceRunnable(logger, publicRpc, publicRpcLogDetail, g.db, g.gst, g.gov)
if err != nil {
return err
}
g.runnables["publicrpc"] = publicrpcService
return nil
}}
}
func GuardianOptionPublicWeb(listenAddr string, publicGRPCSocketPath string, tlsHostname string, tlsProdEnv bool, tlsCacheDir string) *GuardianOption {
return &GuardianOption{
name: "publicweb",
dependencies: []string{"publicrpcsocket"},
f: func(ctx context.Context, logger *zap.Logger, g *G) error {
publicwebService := publicwebServiceRunnable(logger, listenAddr, publicGRPCSocketPath, g.publicrpcServer,
tlsHostname, tlsProdEnv, tlsCacheDir)
g.runnables["publicweb"] = publicwebService
return nil
}}
}
func GuardianOptionBigTablePersistence(config *reporter.BigTableConnectionConfig) *GuardianOption {
return &GuardianOption{
name: "bigtable",
f: func(ctx context.Context, logger *zap.Logger, g *G) error {
g.runnables["bigtable"] = reporter.BigTableWriter(g.attestationEvents, config)
return nil
}}
}
type G struct {
// rootCtxCancel is a context.CancelFunc. It MUST be a root context for any context that is passed to any member function of G.
// It can be used by components to shut down the entire node if they encounter an unrecoverable state.
rootCtxCancel context.CancelFunc
env common.Environment
// keys
gk *ecdsa.PrivateKey
// components
db *db.Database
gst *common.GuardianSetState
acct *accountant.Accountant
gov *governor.ChainGovernor
attestationEvents *reporter.AttestationEventReporter
wormchainConn *wormconn.ClientConn
publicrpcServer *grpc.Server
// runnables
runnablesWithScissors map[string]supervisor.Runnable
runnables map[string]supervisor.Runnable
// various channels
// Outbound gossip message queue (needs to be read/write because p2p needs read/write)
gossipSendC chan []byte
// Inbound observations. This is read/write because the processor also writes to it as a fast-path when handling locally made observations.
obsvC chan *gossipv1.SignedObservation
// Finalized guardian observations aggregated across all chains
msgC channelPair[*common.MessagePublication]
// Ethereum incoming guardian set updates
setC channelPair[*common.GuardianSet]
// Inbound signed VAAs
signedInC channelPair[*gossipv1.SignedVAAWithQuorum]
// Inbound observation requests from the p2p service (for all chains)
obsvReqC channelPair[*gossipv1.ObservationRequest]
// Outbound observation requests
obsvReqSendC channelPair[*gossipv1.ObservationRequest]
// Injected VAAs (manually generated rather than created via observation)
injectC channelPair[*vaa.VAA]
// acctC is the channel where messages will be put after they reached quorum in the accountant.
acctC channelPair[*common.MessagePublication]
}
func NewGuardianNode(
env common.Environment,
db *db.Database,
gk *ecdsa.PrivateKey,
wormchainConn *wormconn.ClientConn, // TODO does this need to be here?
) *G {
g := G{
env: env,
db: db,
gk: gk,
wormchainConn: wormchainConn,
}
return &g
}
// initializeBasic sets up everything that every GuardianNode needs before any options can be applied.
func (g *G) initializeBasic(logger *zap.Logger, rootCtxCancel context.CancelFunc) {
g.rootCtxCancel = rootCtxCancel
// Setup various channels...
g.gossipSendC = make(chan []byte)
g.obsvC = make(chan *gossipv1.SignedObservation, inboundObservationBufferSize)
g.msgC = makeChannelPair[*common.MessagePublication](0)
g.setC = makeChannelPair[*common.GuardianSet](1) // This needs to be a buffered channel because of a circular dependency between processor and accountant during startup.
g.signedInC = makeChannelPair[*gossipv1.SignedVAAWithQuorum](inboundSignedVaaBufferSize)
g.obsvReqC = makeChannelPair[*gossipv1.ObservationRequest](observationRequestOutboundBufferSize)
g.obsvReqSendC = makeChannelPair[*gossipv1.ObservationRequest](observationRequestInboundBufferSize)
g.injectC = makeChannelPair[*vaa.VAA](0)
g.acctC = makeChannelPair[*common.MessagePublication](accountant.MsgChannelCapacity)
// Guardian set state managed by processor
g.gst = common.NewGuardianSetState(nil)
// provides methods for reporting progress toward message attestation, and channels for receiving attestation lifecycle events.
g.attestationEvents = reporter.EventListener(logger)
// allocate maps
g.runnablesWithScissors = make(map[string]supervisor.Runnable)
g.runnables = make(map[string]supervisor.Runnable)
}
// applyOptions applies `options` to the GuardianNode.
// Each option must have a unique option.name.
// If an option has `dependencies`, they must be defined before that option.
func (g *G) applyOptions(ctx context.Context, logger *zap.Logger, options []*GuardianOption) error {
configuredComponents := make(map[string]struct{}) // using `map[string]struct{}` to implement a set here
for _, option := range options {
// check that this component has not been configured yet
if _, ok := configuredComponents[option.name]; ok {
return fmt.Errorf("Component %s is already configured and cannot be configured a second time.", option.name)
}
// check that all dependencies have been met
for _, dep := range option.dependencies {
if _, ok := configuredComponents[dep]; !ok {
return fmt.Errorf("Component %s requires %s to be configured first. Check the order of your options.", option.name, dep)
}
}
// run the config
err := option.f(ctx, logger, g)
if err != nil {
return fmt.Errorf("Error applying option for component %s: %w", option.name, err)
}
// mark the component as configured
configuredComponents[option.name] = struct{}{}
}
return nil
}
func (g *G) Run(rootCtxCancel context.CancelFunc, options ...*GuardianOption) supervisor.Runnable {
return func(ctx context.Context) error {
logger := supervisor.Logger(ctx)
g.initializeBasic(logger, rootCtxCancel)
if err := g.applyOptions(ctx, logger, options); err != nil {
logger.Fatal("failed to initialize GuardianNode", zap.Error(err))
}
// Start the watchers
for runnableName, runnable := range g.runnablesWithScissors {
logger.Info("Starting runnablesWithScissors: " + runnableName)
if err := supervisor.Run(ctx, runnableName, common.WrapWithScissors(runnable, runnableName)); err != nil {
logger.Fatal("error starting runnablesWithScissors", zap.Error(err))
}
}
if g.acct != nil {
logger.Info("Starting accountant")
if err := g.acct.Start(ctx); err != nil {
logger.Fatal("acct: failed to start accountant", zap.Error(err))
}
}
if g.gov != nil {
logger.Info("Starting governor")
if err := g.gov.Run(ctx); err != nil {
logger.Fatal("failed to create chain governor", zap.Error(err))
}
}
logger.Info("Starting processor")
if err := supervisor.Run(ctx, "processor", processor.NewProcessor(ctx,
g.db,
g.msgC.readC,
g.setC.readC,
g.gossipSendC,
g.obsvC,
g.obsvReqSendC.writeC,
g.injectC.readC,
g.signedInC.readC,
g.gk,
g.gst,
g.attestationEvents,
g.gov,
g.acct,
g.acctC.readC,
).Run); err != nil {
logger.Fatal("failed to start processor", zap.Error(err))
}
// Start any other runnables
for name, runnable := range g.runnables {
if err := supervisor.Run(ctx, name, runnable); err != nil {
logger.Fatal("failed to start other runnable", zap.Error(err))
}
}
logger.Info("Started internal services")
supervisor.Signal(ctx, supervisor.SignalHealthy)
<-ctx.Done()
return nil
}
}
type channelPair[T any] struct {
readC <-chan T
writeC chan<- T
}
func makeChannelPair[T any](cap int) channelPair[T] {
out := make(chan T, cap)
return channelPair[T]{out, out}
}

543
node/pkg/node/node_test.go Normal file
View File

@ -0,0 +1,543 @@
package node
import (
"context"
"crypto/ecdsa"
"crypto/rand"
"fmt"
math_rand "math/rand"
"net/http"
"os"
"regexp"
"strconv"
"testing"
"time"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/db"
"github.com/certusone/wormhole/node/pkg/devnet"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
publicrpcv1 "github.com/certusone/wormhole/node/pkg/proto/publicrpc/v1"
"github.com/certusone/wormhole/node/pkg/readiness"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/certusone/wormhole/node/pkg/watchers"
"github.com/certusone/wormhole/node/pkg/watchers/mock"
eth_crypto "github.com/ethereum/go-ethereum/crypto"
ethcrypto "github.com/ethereum/go-ethereum/crypto"
libp2p_crypto "github.com/libp2p/go-libp2p/core/crypto"
libp2p_peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
nodev1 "github.com/certusone/wormhole/node/pkg/proto/node/v1"
eth_common "github.com/ethereum/go-ethereum/common"
)
const LOCAL_RPC_PORTRANGE_START = 10000
const LOCAL_P2P_PORTRANGE_START = 10100
const LOCAL_STATUS_PORTRANGE_START = 10200
type mockGuardian struct {
p2pKey libp2p_crypto.PrivKey
MockObservationC chan *common.MessagePublication
MockSetC chan *common.GuardianSet
gk *ecdsa.PrivateKey
guardianAddr eth_common.Address
ready bool
}
func newMockGuardianSet(n int) []*mockGuardian {
gs := make([]*mockGuardian, n)
for i := 0; i < n; i++ {
// generate guardian key
gk, err := ecdsa.GenerateKey(eth_crypto.S256(), rand.Reader)
if err != nil {
panic(err)
}
gs[i] = &mockGuardian{
p2pKey: devnet.DeterministicP2PPrivKeyByIndex(int64(i)),
MockObservationC: make(chan *common.MessagePublication),
MockSetC: make(chan *common.GuardianSet),
gk: gk,
guardianAddr: ethcrypto.PubkeyToAddress(gk.PublicKey),
}
}
return gs
}
func mockGuardianSetToGuardianAddrList(gs []*mockGuardian) []eth_common.Address {
result := make([]eth_common.Address, len(gs))
for i, g := range gs {
result[i] = g.guardianAddr
}
return result
}
func mockPublicSocket(mockGuardianIndex uint) string {
return fmt.Sprintf("/tmp/test_guardian_%d_public.socket", mockGuardianIndex)
}
func mockAdminStocket(mockGuardianIndex uint) string {
return fmt.Sprintf("/tmp/test_guardian_%d_admin.socket", mockGuardianIndex)
}
func mockPublicRpc(mockGuardianIndex uint) string {
return fmt.Sprintf("127.0.0.1:%d", mockGuardianIndex+LOCAL_RPC_PORTRANGE_START)
}
func mockStatusPort(mockGuardianIndex uint) uint {
return mockGuardianIndex + LOCAL_STATUS_PORTRANGE_START
}
// mockGuardianRunnable returns a runnable that first sets up a mock guardian an then runs it.
func mockGuardianRunnable(gs []*mockGuardian, mockGuardianIndex uint, obsDb mock.ObservationDb) supervisor.Runnable {
return func(ctx context.Context) error {
// Create a sub-context with cancel function that we can pass to G.run.
ctx, ctxCancel := context.WithCancel(ctx)
defer ctxCancel()
logger := supervisor.Logger(ctx)
// setup db
dataDir := fmt.Sprintf("/tmp/test_guardian_%d", mockGuardianIndex)
_ = os.RemoveAll(dataDir) // delete any pre-existing data
db := db.OpenDb(logger, &dataDir)
defer db.Close()
// set environment
env := common.GoTest
// setup a mock watcher
var watcherConfigs = []watchers.WatcherConfig{
&mock.WatcherConfig{
NetworkID: "mock",
ChainID: vaa.ChainIDSolana,
MockObservationC: gs[mockGuardianIndex].MockObservationC,
MockSetC: gs[mockGuardianIndex].MockSetC,
ObservationDb: obsDb, // TODO(future work) add observation DB to support re-observation request
},
}
// configure p2p
nodeName := fmt.Sprintf("g-%d", mockGuardianIndex)
networkID := "/wormhole/localdev"
zeroPeerId, err := libp2p_peer.IDFromPublicKey(gs[0].p2pKey.GetPublic())
if err != nil {
return err
}
bootstrapPeers := fmt.Sprintf("/ip4/127.0.0.1/udp/%d/quic/p2p/%s", LOCAL_P2P_PORTRANGE_START, zeroPeerId.String())
p2pPort := uint(LOCAL_P2P_PORTRANGE_START + mockGuardianIndex)
// configure publicRpc
publicSocketPath := mockPublicSocket(mockGuardianIndex)
publicRpc := mockPublicRpc(mockGuardianIndex)
// configure adminservice
adminSocketPath := mockAdminStocket(mockGuardianIndex)
rpcMap := make(map[string]string)
// assemble all the options
guardianOptions := []*GuardianOption{
GuardianOptionWatchers(watcherConfigs, nil),
GuardianOptionAccountant("", "", false), // effectively disable accountant
GuardianOptionGovernor(false), // disable governor
GuardianOptionP2P(gs[mockGuardianIndex].p2pKey, networkID, bootstrapPeers, nodeName, false, p2pPort, func() string { return "" }),
GuardianOptionPublicRpcSocket(publicSocketPath, common.GrpcLogDetailFull),
GuardianOptionPublicrpcTcpService(publicRpc, common.GrpcLogDetailFull),
GuardianOptionAdminService(adminSocketPath, nil, nil, rpcMap),
GuardianOptionStatusServer(fmt.Sprintf("[::]:%d", mockStatusPort(mockGuardianIndex))),
}
guardianNode := NewGuardianNode(
env,
db,
gs[mockGuardianIndex].gk,
nil,
)
if err = supervisor.Run(ctx, "g", guardianNode.Run(ctxCancel, guardianOptions...)); err != nil {
panic(err)
}
<-ctx.Done()
// cleanup
// _ = os.RemoveAll(dataDir) // we don't do this for now since this could run before BadgerDB's flush(), causing an error; Meh
return nil
}
}
// setupLogsCapture is a helper function for making a zap logger/observer combination for testing that certain logs have been made
func setupLogsCapture() (*zap.Logger, *observer.ObservedLogs) {
observedCore, logs := observer.New(zap.DebugLevel)
logger, _ := zap.NewDevelopment(zap.WrapCore(func(c zapcore.Core) zapcore.Core { return zapcore.NewTee(c, observedCore) }))
return logger, logs
}
func waitForHeartbeats(t *testing.T, zapObserver *observer.ObservedLogs, gs []*mockGuardian) {
// example log entry that we're looking for:
// DEBUG root.g-2.g.p2p p2p/p2p.go:465 valid signed heartbeat received {"value": "node_name:\"g-0\" timestamp:1685677055425243683 version:\"development\" guardian_addr:\"0xeF2a03eAec928DD0EEAf35aD31e34d2b53152c07\" boot_timestamp:1685677040424855922 p2p_node_id:\"\\x00$\\x08\\x01\\x12 \\x97\\xf3\\xbd\\x87\\x13\\x15(\\x1e\\x8b\\x83\\xedǩ\\xfd\\x05A\\x06aTD\\x90p\\xcc\\xdb<\\xddB\\xcfi\\xccވ\"", "from": "12D3KooWL3XJ9EMCyZvmmGXL2LMiVBtrVa2BuESsJiXkSj7333Jw"}
// TODO maybe instead of looking at log entries, we could determine this status through prometheus metrics, which might be more stable
re := regexp.MustCompile("g-[0-9]+")
for readyCounter := 0; readyCounter < len(gs); {
// read log messages
for _, loggedEntry := range zapObserver.FilterMessage("valid signed heartbeat received").All() {
for _, f := range loggedEntry.Context {
if f.Key == "value" {
s, ok := f.Interface.(fmt.Stringer)
assert.True(t, ok)
match := re.FindStringSubmatch(s.String())
assert.NotZero(t, len(match))
guardianId, err := strconv.Atoi(match[0][2:])
assert.NoError(t, err)
assert.True(t, guardianId < len(gs))
if gs[guardianId].ready == false {
gs[guardianId].ready = true
readyCounter++
}
}
}
}
time.Sleep(time.Microsecond * 100)
}
}
type testCase struct {
msg *common.MessagePublication // a Wormhole message
// number of Guardians who will initially observe this message through the mock watcher
numGuardiansObserve int
// if true, Guardians will not observe this message in the mock watcher, if they receive a reobservation request for it
unavailableInReobservation bool
// if true, the test environment will inject a reobservation request signed by Guardian 1,
// as if that Guardian had made a manual reobservation request through an admin command
performManualReobservationRequest bool
// if true, assert that a VAA eventually exists for this message
mustReachQuorum bool
// if true, assert that no VAA exists for this message at the end of the test.
// Note that it is not guaranteed that this message will never reach quorum because it may reach quorum some time after the test run finishes.
mustNotReachQuorum bool
}
func randomTime() time.Time {
return time.Unix(int64(math_rand.Uint32()%1700000000), 0) // nolint // convert time to unix and back to match what is done during serialization/de-serialization
}
var messageSequenceCounter uint64 = 0
func someMessage() *common.MessagePublication {
messageSequenceCounter++
return &common.MessagePublication{
TxHash: [32]byte{byte(messageSequenceCounter % 8), byte(messageSequenceCounter / 8), 3},
Timestamp: randomTime(),
Nonce: math_rand.Uint32(), //nolint
Sequence: messageSequenceCounter,
ConsistencyLevel: 1,
EmitterChain: vaa.ChainIDSolana,
EmitterAddress: [32]byte{1, 2, 3},
Payload: []byte{},
Unreliable: false,
}
}
func makeObsDb(tc []testCase) mock.ObservationDb {
db := make(map[eth_common.Hash]*common.MessagePublication)
for _, t := range tc {
if t.unavailableInReobservation {
continue
}
db[t.msg.TxHash] = t.msg
}
return db
}
// #nosec G107 -- it's OK to make http requests with `statusAddr` because `statusAddr` is trusted.
func testStatusServer(ctx context.Context, logger *zap.Logger, statusAddr string) error {
// Check /readyz
for {
url := statusAddr + "/readyz"
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return err
}
resp, err := http.DefaultClient.Do(req)
if err == nil {
resp.Body.Close()
break
}
logger.Info("StatusServer error, waiting 100ms...", zap.String("url", url))
time.Sleep(time.Millisecond * 100)
}
// Check /metrics (prometheus)
for {
url := statusAddr + "/metrics"
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return err
}
resp, err := http.DefaultClient.Do(req)
if err == nil {
resp.Body.Close()
break
}
logger.Info("StatusServer error, waiting 100ms...", zap.String("url", url))
time.Sleep(time.Millisecond * 100)
}
return nil
}
func TestNodes(t *testing.T) {
const testTimeout = time.Second * 60
const numGuardians = 4 // Quorum will be 3 out of 4 guardians.
const guardianSetIndex = 5 // index of the active guardian set (can be anything, just needs to be set to something)
const vaaCheckGuardianIndex uint = 0 // we will query this guardian's publicrpc for VAAs
const adminRpcGuardianIndex uint = 0 // we will query this guardian's adminRpc
msgZeroEmitter := someMessage()
msgZeroEmitter.EmitterAddress = vaa.Address{}
msgGovEmitter := someMessage()
msgGovEmitter.EmitterAddress = vaa.GovernanceEmitter
msgWrongEmitterChain := someMessage()
msgWrongEmitterChain.EmitterChain = vaa.ChainIDEthereum
// define the test cases to be executed
testCases := []testCase{
{ // one malicious Guardian makes an observation + sends a re-observation request; this should not reach quorum
msg: someMessage(),
numGuardiansObserve: 1,
mustNotReachQuorum: true,
unavailableInReobservation: true,
},
{ // message with EmitterAddress == 0 should not reach quorum
msg: msgZeroEmitter,
numGuardiansObserve: numGuardians,
mustNotReachQuorum: true,
},
{ // message with Governance emitter should not reach quorum
msg: msgGovEmitter,
numGuardiansObserve: numGuardians,
mustNotReachQuorum: true,
},
{ // message with wrong EmitterChain should not reach quorum
msg: msgWrongEmitterChain,
numGuardiansObserve: numGuardians,
mustNotReachQuorum: true,
},
{ // vanilla case, where only a quorum of guardians gets the message
msg: someMessage(),
numGuardiansObserve: numGuardians*2/3 + 1,
mustReachQuorum: true,
},
{ // No Guardian makes the observation while watching, but we do a manual reobservation request.
msg: someMessage(),
numGuardiansObserve: 0,
mustReachQuorum: true,
performManualReobservationRequest: true,
},
// TODO add a testcase to test the automatic re-observation requests.
// Need to refactor various usage of wall time to a mockable time first. E.g. using https://github.com/benbjohnson/clock
}
// Test's main lifecycle context.
rootCtx, rootCtxCancel := context.WithTimeout(context.Background(), testTimeout)
defer rootCtxCancel()
readiness.NoPanic = true // otherwise we'd panic when running multiple guardians
zapLogger, zapObserver := setupLogsCapture()
supervisor.New(rootCtx, zapLogger, func(ctx context.Context) error {
logger := supervisor.Logger(ctx)
// create the Guardian Set
gs := newMockGuardianSet(numGuardians)
obsDb := makeObsDb(testCases)
// run the guardians
for i := 0; i < numGuardians; i++ {
gRun := mockGuardianRunnable(gs, uint(i), obsDb)
err := supervisor.Run(ctx, fmt.Sprintf("g-%d", i), gRun)
assert.NoError(t, err)
}
logger.Info("All Guardians initiated.")
supervisor.Signal(ctx, supervisor.SignalHealthy)
// Inform them of the Guardian Set
commonGuardianSet := common.GuardianSet{
Keys: mockGuardianSetToGuardianAddrList(gs),
Index: guardianSetIndex,
}
for i, g := range gs {
logger.Info("Sending guardian set update", zap.Int("guardian_index", i))
g.MockSetC <- &commonGuardianSet
}
// wait for the status server to come online and check that it works
for i := range gs {
err := testStatusServer(ctx, logger, fmt.Sprintf("http://127.0.0.1:%d", mockStatusPort(uint(i))))
assert.NoError(t, err)
}
// Wait for them to connect each other and receive at least one heartbeat.
// This is necessary because if they have not joined the p2p network yet, gossip messages may get dropped silently.
waitForHeartbeats(t, zapObserver, gs)
logger.Info("All Guardians have received at least one heartbeat.")
// have them make observations
for _, testCase := range testCases {
select {
case <-ctx.Done():
return nil
default:
// make the first testCase.numGuardiansObserve guardians observe it
for guardianIndex, g := range gs {
if guardianIndex >= testCase.numGuardiansObserve {
break
}
msgCopy := *testCase.msg
logger.Info("requesting mock observation for guardian", msgCopy.ZapFields(zap.Int("guardian_index", guardianIndex))...)
g.MockObservationC <- &msgCopy
}
}
}
// Wait for adminrpc to come online
for zapObserver.FilterMessage("admin server listening on").FilterField(zap.String("path", mockAdminStocket(adminRpcGuardianIndex))).Len() == 0 {
logger.Info("admin server seems to be offline (according to logs). Waiting 100ms...")
time.Sleep(time.Microsecond * 100)
}
// Send manual re-observation requests
func() { // put this in own function to use defer
s := fmt.Sprintf("unix:///%s", mockAdminStocket(vaaCheckGuardianIndex))
conn, err := grpc.DialContext(ctx, s, grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer conn.Close()
c := nodev1.NewNodePrivilegedServiceClient(conn)
for i, testCase := range testCases {
if testCase.performManualReobservationRequest {
// timeout for grpc query
logger.Info("injecting observation request through admin rpc", zap.Int("test_case", i))
queryCtx, queryCancel := context.WithTimeout(ctx, time.Second)
_, err = c.SendObservationRequest(queryCtx, &nodev1.SendObservationRequestRequest{
ObservationRequest: &gossipv1.ObservationRequest{
ChainId: uint32(testCase.msg.EmitterChain),
TxHash: testCase.msg.TxHash[:],
},
})
queryCancel()
assert.NoError(t, err)
}
}
}()
// Wait for publicrpc to come online
for zapObserver.FilterMessage("publicrpc server listening").FilterField(zap.String("addr", mockPublicRpc(vaaCheckGuardianIndex))).Len() == 0 {
logger.Info("publicrpc seems to be offline (according to logs). Waiting 100ms...")
time.Sleep(time.Microsecond * 100)
}
// check that the VAAs were generated
logger.Info("Connecting to publicrpc...")
conn, err := grpc.DialContext(ctx, mockPublicRpc(vaaCheckGuardianIndex), grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer conn.Close()
c := publicrpcv1.NewPublicRPCServiceClient(conn)
gsAddrList := mockGuardianSetToGuardianAddrList(gs)
// ensure that all test cases have passed
for i, testCase := range testCases {
msg := testCase.msg
logger.Info("Checking result of testcase", zap.Int("test_case", i))
// poll the API until we get a response without error
var r *publicrpcv1.GetSignedVAAResponse
var err error
for {
select {
case <-ctx.Done():
assert.Fail(t, "timed out")
default:
// timeout for grpc query
logger.Info("attempting to query for VAA", zap.Int("test_case", i))
queryCtx, queryCancel := context.WithTimeout(ctx, time.Second)
r, err = c.GetSignedVAA(queryCtx, &publicrpcv1.GetSignedVAARequest{
MessageId: &publicrpcv1.MessageID{
EmitterChain: publicrpcv1.ChainID(msg.EmitterChain),
EmitterAddress: msg.EmitterAddress.String(),
Sequence: msg.Sequence,
},
})
queryCancel()
if err != nil {
logger.Info("error querying for VAA. Trying agin in 100ms.", zap.Int("test_case", i), zap.Error(err))
}
}
if err == nil && r != nil {
logger.Info("Received VAA from publicrpc", zap.Int("test_case", i), zap.Binary("vaa_bytes", r.VaaBytes))
break
}
if testCase.mustNotReachQuorum {
// no need to re-try because we're expecting an error. (and later we'll assert that's indeed an error)
break
}
time.Sleep(time.Millisecond * 100)
}
assert.NotEqual(t, testCase.mustNotReachQuorum, testCase.mustReachQuorum) // either or
if testCase.mustNotReachQuorum {
assert.EqualError(t, err, "rpc error: code = NotFound desc = requested VAA not found in store")
} else if testCase.mustReachQuorum {
assert.NotNil(t, r)
returnedVaa, err := vaa.Unmarshal(r.VaaBytes)
assert.NoError(t, err)
// Check signatures
err = returnedVaa.Verify(gsAddrList)
assert.NoError(t, err)
// Match all the fields
assert.Equal(t, returnedVaa.Version, uint8(1))
assert.Equal(t, returnedVaa.GuardianSetIndex, uint32(guardianSetIndex))
assert.Equal(t, returnedVaa.Timestamp, msg.Timestamp)
assert.Equal(t, returnedVaa.Nonce, msg.Nonce)
assert.Equal(t, returnedVaa.Sequence, msg.Sequence)
assert.Equal(t, returnedVaa.ConsistencyLevel, msg.ConsistencyLevel)
assert.Equal(t, returnedVaa.EmitterChain, msg.EmitterChain)
assert.Equal(t, returnedVaa.EmitterAddress, msg.EmitterAddress)
assert.Equal(t, returnedVaa.Payload, msg.Payload)
}
}
// We're done!
logger.Info("Tests completed.")
supervisor.Signal(ctx, supervisor.SignalDone)
rootCtxCancel()
return nil
},
supervisor.WithPropagatePanic)
<-rootCtx.Done()
assert.NotEqual(t, rootCtx.Err(), context.DeadlineExceeded)
zapLogger.Info("Test root context cancelled, exiting...")
}

View File

@ -1,4 +1,4 @@
package guardiand
package node
import (
"fmt"

View File

@ -1,4 +1,4 @@
package guardiand
package node
import (
"context"

View File

@ -1,4 +1,4 @@
package guardiand
package node
import (
"context"

View File

@ -1,4 +1,4 @@
package guardiand
package node
import (
"context"

View File

@ -1,4 +1,4 @@
package guardiand
package node
import (
"fmt"

View File

@ -280,14 +280,15 @@ func Run(
ourAddr := ethcrypto.PubkeyToAddress(gk.PublicKey)
ctr := int64(0)
tick := time.NewTicker(15 * time.Second)
defer tick.Stop()
timer := time.NewTimer(time.Nanosecond) // Guardians should send out their first heartbeat immediately to speed up test runs.
defer timer.Stop()
for {
select {
case <-ctx.Done():
return
case <-tick.C:
case <-timer.C:
timer.Reset(15 * time.Second)
// create a heartbeat
b := func() []byte {

View File

@ -23,6 +23,8 @@ import (
"go.uber.org/zap"
)
const LOCAL_P2P_PORTRANGE_START = 11000
type G struct {
// arguments passed to p2p.New
obsvC chan *gossipv1.SignedObservation
@ -105,7 +107,7 @@ func TestWatermark(t *testing.T) {
var gs [4]*G
for i := range gs {
gs[i] = NewG(t, fmt.Sprintf("n%d", i))
gs[i].components.Port = uint(11000 + i)
gs[i].components.Port = uint(LOCAL_P2P_PORTRANGE_START + i)
gs[i].networkID = "/wormhole/localdev"
guardianset.Keys = append(guardianset.Keys, crypto.PubkeyToAddress(gs[i].gk.PublicKey))
@ -113,7 +115,7 @@ func TestWatermark(t *testing.T) {
id, err := p2ppeer.IDFromPublicKey(gs[0].priv.GetPublic())
require.NoError(t, err)
gs[i].bootstrapPeers = fmt.Sprintf("/ip4/127.0.0.1/udp/11000/quic/p2p/%s", id.String())
gs[i].bootstrapPeers = fmt.Sprintf("/ip4/127.0.0.1/udp/%d/quic/p2p/%s", LOCAL_P2P_PORTRANGE_START, id.String())
gs[i].gst.Set(guardianset)
gs[i].components.ConnMgr, _ = connmgr.NewConnManager(2, 3, connmgr.WithGracePeriod(2*time.Second))

View File

@ -12,6 +12,9 @@ import (
)
var (
// TODO is a hack to support running multiple guardians in one process;
// This package should be rewritten to support multiple registries in one process instead of using a global registry
NoPanic = false
mu = sync.Mutex{}
registry = map[string]bool{}
)
@ -23,7 +26,10 @@ func RegisterComponent(component Component) {
mu.Lock()
defer mu.Unlock()
if _, ok := registry[string(component)]; ok {
panic("component already registered")
if !NoPanic {
panic("component already registered")
}
return
}
registry[string(component)] = false
}

View File

@ -0,0 +1,45 @@
package algorand
import (
"github.com/certusone/wormhole/node/pkg/common"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/certusone/wormhole/node/pkg/watchers"
"github.com/certusone/wormhole/node/pkg/watchers/interfaces"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
type WatcherConfig struct {
NetworkID watchers.NetworkID // human readable name
ChainID vaa.ChainID // ChainID
IndexerRPC string
IndexerToken string
AlgodRPC string
AlgodToken string
AppID uint64
}
func (wc *WatcherConfig) GetNetworkID() watchers.NetworkID {
return wc.NetworkID
}
func (wc *WatcherConfig) GetChainID() vaa.ChainID {
return wc.ChainID
}
func (wc *WatcherConfig) RequiredL1Finalizer() watchers.NetworkID {
return ""
}
func (wc *WatcherConfig) SetL1Finalizer(l1finalizer interfaces.L1Finalizer) {
// empty
}
func (wc *WatcherConfig) Create(
msgC chan<- *common.MessagePublication,
obsvReqC <-chan *gossipv1.ObservationRequest,
_ chan<- *common.GuardianSet,
env common.Environment,
) (interfaces.L1Finalizer, supervisor.Runnable, error) {
return nil, NewWatcher(wc.IndexerRPC, wc.IndexerToken, wc.AlgodRPC, wc.AlgodToken, wc.AppID, msgC, obsvReqC).Run, nil
}

View File

@ -0,0 +1,43 @@
package aptos
import (
"github.com/certusone/wormhole/node/pkg/common"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/certusone/wormhole/node/pkg/watchers"
"github.com/certusone/wormhole/node/pkg/watchers/interfaces"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
type WatcherConfig struct {
NetworkID watchers.NetworkID // human readable name
ChainID vaa.ChainID // ChainID
Rpc string
Account string
Handle string
}
func (wc *WatcherConfig) GetNetworkID() watchers.NetworkID {
return wc.NetworkID
}
func (wc *WatcherConfig) GetChainID() vaa.ChainID {
return wc.ChainID
}
func (wc *WatcherConfig) RequiredL1Finalizer() watchers.NetworkID {
return ""
}
func (wc *WatcherConfig) SetL1Finalizer(l1finalizer interfaces.L1Finalizer) {
// empty
}
func (wc *WatcherConfig) Create(
msgC chan<- *common.MessagePublication,
obsvReqC <-chan *gossipv1.ObservationRequest,
_ chan<- *common.GuardianSet,
env common.Environment,
) (interfaces.L1Finalizer, supervisor.Runnable, error) {
return nil, NewWatcher(wc.Rpc, wc.Account, wc.Handle, msgC, obsvReqC).Run, nil
}

View File

@ -0,0 +1,43 @@
package cosmwasm
import (
"github.com/certusone/wormhole/node/pkg/common"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/certusone/wormhole/node/pkg/watchers"
"github.com/certusone/wormhole/node/pkg/watchers/interfaces"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
type WatcherConfig struct {
NetworkID watchers.NetworkID // human readable name
ChainID vaa.ChainID // ChainID
Websocket string // Websocket URL
Lcd string // LCD
Contract string // hex representation of the contract address
}
func (wc *WatcherConfig) GetNetworkID() watchers.NetworkID {
return wc.NetworkID
}
func (wc *WatcherConfig) GetChainID() vaa.ChainID {
return wc.ChainID
}
func (wc *WatcherConfig) RequiredL1Finalizer() watchers.NetworkID {
return ""
}
func (wc *WatcherConfig) SetL1Finalizer(l1finalizer interfaces.L1Finalizer) {
// empty
}
func (wc *WatcherConfig) Create(
msgC chan<- *common.MessagePublication,
obsvReqC <-chan *gossipv1.ObservationRequest,
_ chan<- *common.GuardianSet,
env common.Environment,
) (interfaces.L1Finalizer, supervisor.Runnable, error) {
return nil, NewWatcher(wc.Websocket, wc.Lcd, wc.Contract, msgC, obsvReqC, wc.ChainID, env).Run, nil
}

View File

@ -103,13 +103,13 @@ func NewWatcher(
msgC chan<- *common.MessagePublication,
obsvReqC <-chan *gossipv1.ObservationRequest,
chainID vaa.ChainID,
unsafeDevMode bool,
env common.Environment,
) *Watcher {
// CosmWasm 1.0.0
contractAddressFilterKey := "execute._contract_address"
contractAddressLogKey := "_contract_address"
if chainID == vaa.ChainIDTerra && unsafeDevMode {
if chainID == vaa.ChainIDTerra && env == common.UnsafeDevNet {
// Terra Classic upgraded CosmWasm versions, so they now use the new format. Here is a message from their Discord:
// The v2.1.1 upgrade will occur on blockheight 13215800 on June 14th (2023) at approximately 14:00 UTC.
// Queries for transactions before that block no longer work, so we don't have to worry about supporting them.

View File

@ -0,0 +1,64 @@
package evm
import (
"github.com/certusone/wormhole/node/pkg/common"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/certusone/wormhole/node/pkg/watchers"
"github.com/certusone/wormhole/node/pkg/watchers/interfaces"
eth_common "github.com/ethereum/go-ethereum/common"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
type WatcherConfig struct {
NetworkID watchers.NetworkID // human readable name
ChainID vaa.ChainID // ChainID
Rpc string // RPC URL
Contract string // hex representation of the contract address
GuardianSetUpdateChain bool // if `true`, we will retrieve the GuardianSet from this chain and watch this chain for GuardianSet updates
WaitForConfirmations bool // (optional)
RootChainRpc string // (optional)
RootChainContract string // (optional)
L1FinalizerRequired watchers.NetworkID // (optional)
l1Finalizer interfaces.L1Finalizer
}
func (wc *WatcherConfig) GetNetworkID() watchers.NetworkID {
return wc.NetworkID
}
func (wc *WatcherConfig) GetChainID() vaa.ChainID {
return wc.ChainID
}
func (wc *WatcherConfig) RequiredL1Finalizer() watchers.NetworkID {
return wc.L1FinalizerRequired
}
func (wc *WatcherConfig) SetL1Finalizer(l1finalizer interfaces.L1Finalizer) {
wc.l1Finalizer = l1finalizer
}
func (wc *WatcherConfig) Create(
msgC chan<- *common.MessagePublication,
obsvReqC <-chan *gossipv1.ObservationRequest,
setC chan<- *common.GuardianSet,
env common.Environment,
) (interfaces.L1Finalizer, supervisor.Runnable, error) {
// only actually use the guardian set channel if wc.GuardianSetUpdateChain == true
var setWriteC chan<- *common.GuardianSet = nil
if wc.GuardianSetUpdateChain {
setWriteC = setC
}
var devMode bool = (env == common.UnsafeDevNet)
watcher := NewEthWatcher(wc.Rpc, eth_common.HexToAddress(wc.Contract), string(wc.NetworkID), wc.ChainID, msgC, setWriteC, obsvReqC, devMode)
watcher.SetWaitForConfirmations(wc.WaitForConfirmations)
if err := watcher.SetRootChainParams(wc.RootChainRpc, wc.RootChainContract); err != nil {
return nil, nil, err
}
watcher.SetL1Finalizer(wc.l1Finalizer)
return watcher, watcher.Run, nil
}

View File

@ -0,0 +1,47 @@
package mock
import (
"github.com/certusone/wormhole/node/pkg/common"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/certusone/wormhole/node/pkg/watchers"
"github.com/certusone/wormhole/node/pkg/watchers/interfaces"
eth_common "github.com/ethereum/go-ethereum/common"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
type ObservationDb map[eth_common.Hash]*common.MessagePublication
// The Mock Watcher is a watcher that will make a new observation
type WatcherConfig struct {
NetworkID watchers.NetworkID // human readable name
ChainID vaa.ChainID // ChainID
MockObservationC chan *common.MessagePublication // Channel to feed this watcher mock observations that it will then make
ObservationDb ObservationDb // If the watcher receives a re-observation request with a TxHash in this map, it will make the corresponding observation in this map.
MockSetC <-chan *common.GuardianSet
}
func (wc *WatcherConfig) GetNetworkID() watchers.NetworkID {
return wc.NetworkID
}
func (wc *WatcherConfig) GetChainID() vaa.ChainID {
return wc.ChainID
}
func (wc *WatcherConfig) RequiredL1Finalizer() watchers.NetworkID {
return ""
}
func (wc *WatcherConfig) SetL1Finalizer(l1finalizer interfaces.L1Finalizer) {
// empty
}
func (wc *WatcherConfig) Create(
msgC chan<- *common.MessagePublication,
obsvReqC <-chan *gossipv1.ObservationRequest,
setC chan<- *common.GuardianSet,
env common.Environment,
) (interfaces.L1Finalizer, supervisor.Runnable, error) {
return nil, NewWatcherRunnable(msgC, obsvReqC, setC, wc), nil
}

View File

@ -0,0 +1,44 @@
package mock
import (
"context"
"github.com/certusone/wormhole/node/pkg/common"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/supervisor"
eth_common "github.com/ethereum/go-ethereum/common"
"go.uber.org/zap"
)
func NewWatcherRunnable(
msgC chan<- *common.MessagePublication,
obsvReqC <-chan *gossipv1.ObservationRequest,
setC chan<- *common.GuardianSet,
c *WatcherConfig,
) supervisor.Runnable {
return func(ctx context.Context) error {
logger := supervisor.Logger(ctx)
supervisor.Signal(ctx, supervisor.SignalHealthy)
logger.Info("Mock Watcher running.")
for {
select {
case <-ctx.Done():
logger.Info("Mock Watcher shutting down.")
return nil
case observation := <-c.MockObservationC:
logger.Info("message observed", observation.ZapFields()...)
msgC <- observation
case gs := <-c.MockSetC:
setC <- gs
case o := <-obsvReqC:
hash := eth_common.BytesToHash(o.TxHash)
logger.Info("Received obsv request", zap.String("log_msg_type", "obsv_req_received"), zap.String("tx_hash", hash.Hex()))
msg, ok := c.ObservationDb[hash]
if ok {
msgC <- msg
}
}
}
}
}

View File

@ -0,0 +1,43 @@
package near
import (
"github.com/certusone/wormhole/node/pkg/common"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/certusone/wormhole/node/pkg/watchers"
"github.com/certusone/wormhole/node/pkg/watchers/interfaces"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
type WatcherConfig struct {
NetworkID watchers.NetworkID // human readable name
ChainID vaa.ChainID // ChainID
Rpc string
Contract string
}
func (wc *WatcherConfig) GetNetworkID() watchers.NetworkID {
return wc.NetworkID
}
func (wc *WatcherConfig) GetChainID() vaa.ChainID {
return wc.ChainID
}
func (wc *WatcherConfig) RequiredL1Finalizer() watchers.NetworkID {
return ""
}
func (wc *WatcherConfig) SetL1Finalizer(l1finalizer interfaces.L1Finalizer) {
// empty
}
func (wc *WatcherConfig) Create(
msgC chan<- *common.MessagePublication,
obsvReqC <-chan *gossipv1.ObservationRequest,
_ chan<- *common.GuardianSet,
env common.Environment,
) (interfaces.L1Finalizer, supervisor.Runnable, error) {
var mainnet bool = (env == common.MainNet)
return nil, NewWatcher(wc.Rpc, wc.Contract, msgC, obsvReqC, mainnet).Run, nil
}

View File

@ -0,0 +1,58 @@
package solana
import (
"github.com/certusone/wormhole/node/pkg/common"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/certusone/wormhole/node/pkg/watchers"
"github.com/certusone/wormhole/node/pkg/watchers/interfaces"
solana_types "github.com/gagliardetto/solana-go"
solana_rpc "github.com/gagliardetto/solana-go/rpc"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
type WatcherConfig struct {
NetworkID watchers.NetworkID // unique identifier of the network
ChainID vaa.ChainID // ChainID
ReceiveObsReq bool // if false, this watcher will not get access to the observation request channel
Rpc string // RPC URL
Websocket string // Websocket URL
Contract string // hex representation of the contract address
Commitment solana_rpc.CommitmentType
}
func (wc *WatcherConfig) GetNetworkID() watchers.NetworkID {
return wc.NetworkID
}
func (wc *WatcherConfig) GetChainID() vaa.ChainID {
return wc.ChainID
}
func (wc *WatcherConfig) RequiredL1Finalizer() watchers.NetworkID {
return ""
}
func (wc *WatcherConfig) SetL1Finalizer(l1finalizer interfaces.L1Finalizer) {
// empty
}
func (wc *WatcherConfig) Create(
msgC chan<- *common.MessagePublication,
obsvReqC <-chan *gossipv1.ObservationRequest,
_ chan<- *common.GuardianSet,
env common.Environment,
) (interfaces.L1Finalizer, supervisor.Runnable, error) {
solAddress, err := solana_types.PublicKeyFromBase58(wc.Contract)
if err != nil {
return nil, nil, err
}
if !wc.ReceiveObsReq {
obsvReqC = nil
}
watcher := NewSolanaWatcher(wc.Rpc, &wc.Websocket, solAddress, wc.Contract, msgC, obsvReqC, wc.Commitment, wc.ChainID)
return watcher, watcher.Run, nil
}

View File

@ -0,0 +1,45 @@
package sui
import (
"github.com/certusone/wormhole/node/pkg/common"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/certusone/wormhole/node/pkg/watchers"
"github.com/certusone/wormhole/node/pkg/watchers/interfaces"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
type WatcherConfig struct {
NetworkID watchers.NetworkID // human readable name
ChainID vaa.ChainID // ChainID
Rpc string
Websocket string
SuiMoveEventType string
}
func (wc *WatcherConfig) GetNetworkID() watchers.NetworkID {
return wc.NetworkID
}
func (wc *WatcherConfig) GetChainID() vaa.ChainID {
return wc.ChainID
}
func (wc *WatcherConfig) RequiredL1Finalizer() watchers.NetworkID {
return ""
}
func (wc *WatcherConfig) SetL1Finalizer(l1finalizer interfaces.L1Finalizer) {
// empty
}
func (wc *WatcherConfig) Create(
msgC chan<- *common.MessagePublication,
obsvReqC <-chan *gossipv1.ObservationRequest,
_ chan<- *common.GuardianSet,
env common.Environment,
) (interfaces.L1Finalizer, supervisor.Runnable, error) {
var devMode bool = (env == common.UnsafeDevNet)
return nil, NewWatcher(wc.Rpc, wc.Websocket, wc.SuiMoveEventType, devMode, msgC, obsvReqC).Run, nil
}

View File

@ -41,8 +41,8 @@ type (
unsafeDevMode bool
msgChan chan *common.MessagePublication
obsvReqC chan *gossipv1.ObservationRequest
msgChan chan<- *common.MessagePublication
obsvReqC <-chan *gossipv1.ObservationRequest
readinessSync readiness.Component
subId int64
@ -158,8 +158,8 @@ func NewWatcher(
suiWS string,
suiMoveEventType string,
unsafeDevMode bool,
messageEvents chan *common.MessagePublication,
obsvReqC chan *gossipv1.ObservationRequest,
messageEvents chan<- *common.MessagePublication,
obsvReqC <-chan *gossipv1.ObservationRequest,
) *Watcher {
return &Watcher{
suiRPC: suiRPC,

View File

@ -0,0 +1,26 @@
package watchers
import (
"github.com/certusone/wormhole/node/pkg/common"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/certusone/wormhole/node/pkg/watchers/interfaces"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
// NetworkID is a unique identifier of a watcher that is used to link watchers together for the purpose of L1 Finalizers.
// This is different from vaa.ChainID because there could be multiple watchers for a single chain (e.g. solana-confirmed and solana-finalized)
type NetworkID string
type WatcherConfig interface {
GetNetworkID() NetworkID
GetChainID() vaa.ChainID
RequiredL1Finalizer() NetworkID // returns NetworkID of the L1 Finalizer that should be used for this Watcher.
SetL1Finalizer(l1finalizer interfaces.L1Finalizer)
Create(
msgC chan<- *common.MessagePublication,
obsvReqC <-chan *gossipv1.ObservationRequest,
setC chan<- *common.GuardianSet,
env common.Environment,
) (interfaces.L1Finalizer, supervisor.Runnable, error)
}