bridge: heartbeat, eth watcher service

This commit is contained in:
Leo 2020-08-16 17:05:58 +02:00
parent 28fef7efca
commit 2744c1df25
3 changed files with 77 additions and 7 deletions

View File

@ -2,7 +2,7 @@
local_resource(
name = "proto-gen",
deps = "proto/*",
deps = "./proto",
cmd = "./generate.sh",
)

View File

@ -10,11 +10,17 @@ import (
"strings"
"time"
eth_common "github.com/ethereum/go-ethereum/common"
"github.com/golang/protobuf/proto"
"github.com/libp2p/go-libp2p-core/protocol"
swarm "github.com/libp2p/go-libp2p-swarm"
"github.com/multiformats/go-multiaddr"
"github.com/prometheus/common/log"
"go.uber.org/zap"
"github.com/certusone/wormhole/bridge/pkg/common"
"github.com/certusone/wormhole/bridge/pkg/ethereum"
gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/bridge/pkg/supervisor"
ipfslog "github.com/ipfs/go-log/v2"
@ -34,7 +40,13 @@ var (
p2pNetworkID = flag.String("network", "/wormhole/dev", "P2P network identifier")
p2pPort = flag.Uint("port", 8999, "P2P UDP listener port")
p2pBootstrap = flag.String("bootstrap", "", "P2P bootstrap peers (comma-separated)")
nodeKeyPath = flag.String("nodeKey", "", "Path to node key (will be generated if it doesn't exist)")
ethRPC = flag.String("ethRPC", "", "Ethereum RPC URL")
ethContract = flag.String("ethContract", "", "Ethereum bridge contract address")
ethConfirmations = flag.Uint64("ethConfirmations", 5, "Ethereum confirmation count requirement")
logLevel = flag.String("loglevel", "info", "Logging level (debug, info, warn, error, dpanic, panic, fatal)")
)
@ -69,17 +81,32 @@ func main() {
if *nodeKeyPath == "" {
logger.Fatal("Please specify -nodeKey")
}
if *ethRPC == "" {
logger.Fatal("Please specify -ethRPC")
}
ethContractAddr := eth_common.HexToAddress(*ethContract)
// Node's main lifecycle context.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Ethereum lock event channel
ec := make(chan *common.ChainLock)
// Run supervisor.
supervisor.New(ctx, logger.Desugar(), func(ctx context.Context) error {
if err := supervisor.Run(ctx, "p2p", p2p); err != nil {
return err
}
watcher := ethereum.NewEthBridgeWatcher(
*ethRPC, ethContractAddr, *ethConfirmations, ec)
if err := supervisor.Run(ctx, "eth", watcher.Run); err != nil {
return err
}
supervisor.Signal(ctx, supervisor.SignalHealthy)
logger.Info("Created services")
@ -269,6 +296,7 @@ func p2p(ctx context.Context) error {
}
if successes == 0 {
h.Close()
return fmt.Errorf("Failed to connect to any bootstrap peer")
} else {
logger.Info("Connected to bootstrap peers", zap.Int("num", successes))
@ -285,15 +313,53 @@ func p2p(ctx context.Context) error {
panic(err)
}
th, err := ps.Join(topic)
if err != nil {
return fmt.Errorf("failed to join topic: %w", err)
}
sub, err := th.Subscribe()
if err != nil {
return fmt.Errorf("failed to subscribe topic: %w", err)
}
logger.Info("Node has been started", zap.String("peer_id", h.ID().String()),
zap.String("addrs", fmt.Sprintf("%v", h.Addrs())))
for {
time.Sleep(1 * time.Second)
for _, p := range ps.ListPeers(topic) {
logger.Debug("Found pubsub peer", zap.String("peer_id", p.Pretty()))
}
hostname, err := os.Hostname()
if err != nil {
panic(err)
}
select {}
go func() {
ctr := int64(0)
for {
msg := gossipv1.Heartbeat{
Hostname: hostname,
Index: ctr,
}
b, err := proto.Marshal(&msg)
if err != nil {
panic(err)
}
err = th.Publish(ctx, b)
if err != nil {
log.Warn("failed to publish message", zap.Error(err))
}
time.Sleep(15 * time.Second)
}
}()
for {
msg, err := sub.Next(ctx)
if err != nil {
return fmt.Errorf("failed to receive pubsub message: %w", err)
}
logger.Info("received message", zap.String("data", string(msg.Data)), zap.String("from", msg.GetFrom().String()))
}
}

View File

@ -43,6 +43,10 @@ spec:
- /data/node.key
- -bootstrap
- /dns4/guardian-0.guardian/udp/8999/quic/p2p/12D3KooWQ1sV2kowPY1iJX1hJcVTysZjKv3sfULTGwhdpUGGZ1VF
- -ethRPC
- ws://eth-devnet:8545
- -ethContract
- 0x5b1869D9A4C187F2EAa108f3062412ecf0526b24
ports:
- containerPort: 8999
name: p2p