bridge: fix p2p routine restart

Turns out, libp2p cannot be cleaned up so till this is fixed,
we have to exit if p2p dies.

Expose the root context to p2p and cancel it.
This commit is contained in:
Leo 2020-08-17 14:55:51 +02:00
parent 090d0aca84
commit c4d53247d3
2 changed files with 28 additions and 33 deletions

View File

@ -29,6 +29,11 @@ var (
logLevel = flag.String("loglevel", "info", "Logging level (debug, info, warn, error, dpanic, panic, fatal)")
)
var (
rootCtx context.Context
rootCtxCancel context.CancelFunc
)
func main() {
flag.Parse()
@ -67,33 +72,35 @@ func main() {
ethContractAddr := eth_common.HexToAddress(*ethContract)
// Node's main lifecycle context.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rootCtx, rootCtxCancel = context.WithCancel(context.Background())
defer rootCtxCancel()
// Ethereum lock event channel
ec := make(chan *common.ChainLock)
// Run supervisor.
supervisor.New(ctx, logger.Desugar(), func(ctx context.Context) error {
supervisor.New(rootCtx, logger.Desugar(), func(ctx context.Context) error {
if err := supervisor.Run(ctx, "p2p", p2p); err != nil {
return err
}
watcher := ethereum.NewEthBridgeWatcher(
*ethRPC, ethContractAddr, *ethConfirmations, ec)
if err := supervisor.Run(ctx, "eth", watcher.Run); err != nil {
if err := supervisor.Run(ctx, "eth",
ethereum.NewEthBridgeWatcher(*ethRPC, ethContractAddr, *ethConfirmations, ec).Run); err != nil {
return err
}
logger.Info("Started internal services")
supervisor.Signal(ctx, supervisor.SignalHealthy)
logger.Info("Created services")
select {}
}, supervisor.WithPropagatePanic)
// TODO(leo): only propagate panics in debug mode. We currently need this to properly reset p2p
// (it leaks its socket and we need to restart the process to fix it)
select {
case <-ctx.Done():
return nil
}
})
select {}
select {
case <-rootCtx.Done():
logger.Info("root context cancelled, exiting...")
// TODO: wait for things to shut down gracefully
}
}

View File

@ -26,8 +26,7 @@ import (
"github.com/certusone/wormhole/bridge/pkg/supervisor"
)
func p2p(ctx context.Context) error {
func p2p(ctx context.Context) (re error) {
logger := supervisor.Logger(ctx)
priv := bootstrapNodePrivateKeyHack()
@ -84,26 +83,18 @@ func p2p(ctx context.Context) error {
return idht, err
}),
)
defer func() {
fmt.Printf("h is %+v", h)
// FIXME: why can this be nil? We need to close the host to free the socket because apparently,
// closing the context is not enough, but sometimes h is nil when the function runs.
if h != nil {
h.Close()
}
}()
if err != nil {
panic(err)
}
defer func() {
// TODO: libp2p cannot be cleanly restarted (https://github.com/libp2p/go-libp2p/issues/992)
logger.Error("p2p routine has exited, cancelling root context...", zap.Error(re))
rootCtxCancel()
}()
logger.Info("Connecting to bootstrap peers")
// TODO(leo): use our own bootstrap peers rather than the IPFS ones so we have a dedicated network
//for _, addr := range dht.DefaultBootstrapPeers {
// pi, _ := peer.AddrInfoFromP2pAddr(addr)
// // We ignore errors as some bootstrap peers may be down and that is fine.
// _ = h.Connect(ctx, *pi)
//}
// Add our own bootstrap nodes
@ -146,9 +137,6 @@ func p2p(ctx context.Context) error {
logger.Info("Connected to bootstrap peers", zap.Int("num", successes))
}
// TODO(leo): crash if we couldn't connect to any bootstrap peers?
// (i.e. can we get stuck here if the other nodes have yet to come up?)
topic := fmt.Sprintf("%s/%s", *p2pNetworkID, "broadcast")
logger.Info("Subscribing pubsub topic", zap.String("topic", topic))