From 2744c1df2588a8766bb3a8ac36dcc6c2d70ecf6a Mon Sep 17 00:00:00 2001 From: Leo Date: Sun, 16 Aug 2020 17:05:58 +0200 Subject: [PATCH] bridge: heartbeat, eth watcher service --- Tiltfile | 2 +- bridge/cmd/guardiand/main.go | 78 +++++++++++++++++++++++++++++++++--- devnet/bridge.yaml | 4 ++ 3 files changed, 77 insertions(+), 7 deletions(-) diff --git a/Tiltfile b/Tiltfile index 3af198a9e..4b651f5a9 100644 --- a/Tiltfile +++ b/Tiltfile @@ -2,7 +2,7 @@ local_resource( name = "proto-gen", - deps = "proto/*", + deps = "./proto", cmd = "./generate.sh", ) diff --git a/bridge/cmd/guardiand/main.go b/bridge/cmd/guardiand/main.go index d145c522c..3b904054e 100644 --- a/bridge/cmd/guardiand/main.go +++ b/bridge/cmd/guardiand/main.go @@ -10,11 +10,17 @@ import ( "strings" "time" + eth_common "github.com/ethereum/go-ethereum/common" + "github.com/golang/protobuf/proto" "github.com/libp2p/go-libp2p-core/protocol" swarm "github.com/libp2p/go-libp2p-swarm" "github.com/multiformats/go-multiaddr" + "github.com/prometheus/common/log" "go.uber.org/zap" + "github.com/certusone/wormhole/bridge/pkg/common" + "github.com/certusone/wormhole/bridge/pkg/ethereum" + gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1" "github.com/certusone/wormhole/bridge/pkg/supervisor" ipfslog "github.com/ipfs/go-log/v2" @@ -34,7 +40,13 @@ var ( p2pNetworkID = flag.String("network", "/wormhole/dev", "P2P network identifier") p2pPort = flag.Uint("port", 8999, "P2P UDP listener port") p2pBootstrap = flag.String("bootstrap", "", "P2P bootstrap peers (comma-separated)") + nodeKeyPath = flag.String("nodeKey", "", "Path to node key (will be generated if it doesn't exist)") + + ethRPC = flag.String("ethRPC", "", "Ethereum RPC URL") + ethContract = flag.String("ethContract", "", "Ethereum bridge contract address") + ethConfirmations = flag.Uint64("ethConfirmations", 5, "Ethereum confirmation count requirement") + logLevel = flag.String("loglevel", "info", "Logging level (debug, info, warn, error, dpanic, panic, fatal)") ) @@ -69,17 +81,32 @@ func main() { if *nodeKeyPath == "" { logger.Fatal("Please specify -nodeKey") } + if *ethRPC == "" { + logger.Fatal("Please specify -ethRPC") + } + + ethContractAddr := eth_common.HexToAddress(*ethContract) // Node's main lifecycle context. ctx, cancel := context.WithCancel(context.Background()) defer cancel() + // Ethereum lock event channel + ec := make(chan *common.ChainLock) + // Run supervisor. supervisor.New(ctx, logger.Desugar(), func(ctx context.Context) error { if err := supervisor.Run(ctx, "p2p", p2p); err != nil { return err } + watcher := ethereum.NewEthBridgeWatcher( + *ethRPC, ethContractAddr, *ethConfirmations, ec) + + if err := supervisor.Run(ctx, "eth", watcher.Run); err != nil { + return err + } + supervisor.Signal(ctx, supervisor.SignalHealthy) logger.Info("Created services") @@ -269,6 +296,7 @@ func p2p(ctx context.Context) error { } if successes == 0 { + h.Close() return fmt.Errorf("Failed to connect to any bootstrap peer") } else { logger.Info("Connected to bootstrap peers", zap.Int("num", successes)) @@ -285,15 +313,53 @@ func p2p(ctx context.Context) error { panic(err) } + th, err := ps.Join(topic) + if err != nil { + return fmt.Errorf("failed to join topic: %w", err) + } + + sub, err := th.Subscribe() + if err != nil { + return fmt.Errorf("failed to subscribe topic: %w", err) + } + logger.Info("Node has been started", zap.String("peer_id", h.ID().String()), zap.String("addrs", fmt.Sprintf("%v", h.Addrs()))) - for { - time.Sleep(1 * time.Second) - for _, p := range ps.ListPeers(topic) { - logger.Debug("Found pubsub peer", zap.String("peer_id", p.Pretty())) - } + hostname, err := os.Hostname() + if err != nil { + panic(err) } - select {} + go func() { + ctr := int64(0) + + for { + msg := gossipv1.Heartbeat{ + Hostname: hostname, + Index: ctr, + } + + b, err := proto.Marshal(&msg) + if err != nil { + panic(err) + } + + err = th.Publish(ctx, b) + if err != nil { + log.Warn("failed to publish message", zap.Error(err)) + } + + time.Sleep(15 * time.Second) + } + }() + + for { + msg, err := sub.Next(ctx) + if err != nil { + return fmt.Errorf("failed to receive pubsub message: %w", err) + } + + logger.Info("received message", zap.String("data", string(msg.Data)), zap.String("from", msg.GetFrom().String())) + } } diff --git a/devnet/bridge.yaml b/devnet/bridge.yaml index ca5a9388f..b49b3be35 100644 --- a/devnet/bridge.yaml +++ b/devnet/bridge.yaml @@ -43,6 +43,10 @@ spec: - /data/node.key - -bootstrap - /dns4/guardian-0.guardian/udp/8999/quic/p2p/12D3KooWQ1sV2kowPY1iJX1hJcVTysZjKv3sfULTGwhdpUGGZ1VF + - -ethRPC + - ws://eth-devnet:8545 + - -ethContract + - 0x5b1869D9A4C187F2EAa108f3062412ecf0526b24 ports: - containerPort: 8999 name: p2p