bridge: setup pubsub before connecting to bootnodes; buffer observations

This commit is contained in:
Hendrik Hofstadt 2020-08-31 16:11:09 +02:00
parent fb603d468c
commit d537f976f9
4 changed files with 24 additions and 24 deletions

View File

@ -165,7 +165,7 @@ func main() {
sendC := make(chan []byte) sendC := make(chan []byte)
// Inbound observations // Inbound observations
obsvC := make(chan *gossipv1.LockupObservation) obsvC := make(chan *gossipv1.LockupObservation, 50)
// VAAs to submit to Solana // VAAs to submit to Solana
solanaVaaC := make(chan *vaa.VAA) solanaVaaC := make(chan *vaa.VAA)

View File

@ -3,6 +3,8 @@ package main
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
"strings" "strings"
"time" "time"
@ -10,14 +12,12 @@ import (
connmgr "github.com/libp2p/go-libp2p-connmgr" connmgr "github.com/libp2p/go-libp2p-connmgr"
"github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/routing" "github.com/libp2p/go-libp2p-core/routing"
dht "github.com/libp2p/go-libp2p-kad-dht" dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
libp2pquic "github.com/libp2p/go-libp2p-quic-transport" libp2pquic "github.com/libp2p/go-libp2p-quic-transport"
libp2ptls "github.com/libp2p/go-libp2p-tls" libp2ptls "github.com/libp2p/go-libp2p-tls"
"github.com/multiformats/go-multiaddr"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
@ -98,6 +98,24 @@ func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx con
logger.Info("Connecting to bootstrap peers", zap.String("bootstrap_peers", *p2pBootstrap)) logger.Info("Connecting to bootstrap peers", zap.String("bootstrap_peers", *p2pBootstrap))
topic := fmt.Sprintf("%s/%s", *p2pNetworkID, "broadcast")
logger.Info("Subscribing pubsub topic", zap.String("topic", topic))
ps, err := pubsub.NewGossipSub(ctx, h)
if err != nil {
panic(err)
}
th, err := ps.Join(topic)
if err != nil {
return fmt.Errorf("failed to join topic: %w", err)
}
sub, err := th.Subscribe()
if err != nil {
return fmt.Errorf("failed to subscribe topic: %w", err)
}
// Add our own bootstrap nodes // Add our own bootstrap nodes
// Count number of successful connection attempts. If we fail to connect to every bootstrap peer, kill // Count number of successful connection attempts. If we fail to connect to every bootstrap peer, kill
@ -141,24 +159,6 @@ func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx con
logger.Info("Connected to bootstrap peers", zap.Int("num", successes)) logger.Info("Connected to bootstrap peers", zap.Int("num", successes))
} }
topic := fmt.Sprintf("%s/%s", *p2pNetworkID, "broadcast")
logger.Info("Subscribing pubsub topic", zap.String("topic", topic))
ps, err := pubsub.NewGossipSub(ctx, h)
if err != nil {
panic(err)
}
th, err := ps.Join(topic)
if err != nil {
return fmt.Errorf("failed to join topic: %w", err)
}
sub, err := th.Subscribe()
if err != nil {
return fmt.Errorf("failed to subscribe topic: %w", err)
}
logger.Info("Node has been started", zap.String("peer_id", h.ID().String()), logger.Info("Node has been started", zap.String("peer_id", h.ID().String()),
zap.String("addrs", fmt.Sprintf("%v", h.Addrs()))) zap.String("addrs", fmt.Sprintf("%v", h.Addrs())))

View File

@ -15,7 +15,7 @@ require (
github.com/libp2p/go-libp2p-connmgr v0.2.4 github.com/libp2p/go-libp2p-connmgr v0.2.4
github.com/libp2p/go-libp2p-core v0.6.1 github.com/libp2p/go-libp2p-core v0.6.1
github.com/libp2p/go-libp2p-kad-dht v0.8.3 github.com/libp2p/go-libp2p-kad-dht v0.8.3
github.com/libp2p/go-libp2p-pubsub v0.3.3 github.com/libp2p/go-libp2p-pubsub v0.3.5
github.com/libp2p/go-libp2p-quic-transport v0.7.1 github.com/libp2p/go-libp2p-quic-transport v0.7.1
github.com/libp2p/go-libp2p-tls v0.1.3 github.com/libp2p/go-libp2p-tls v0.1.3
github.com/mattn/go-colorable v0.1.4 // indirect github.com/mattn/go-colorable v0.1.4 // indirect

View File

@ -429,8 +429,8 @@ github.com/libp2p/go-libp2p-peerstore v0.2.6 h1:2ACefBX23iMdJU9Ke+dcXt3w86MIryes
github.com/libp2p/go-libp2p-peerstore v0.2.6/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s= github.com/libp2p/go-libp2p-peerstore v0.2.6/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s=
github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k= github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k=
github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA= github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA=
github.com/libp2p/go-libp2p-pubsub v0.3.3 h1:/AzOAmjDc+IJWybEzhYj1UaV1HErqmo4v3pQVepbgi8= github.com/libp2p/go-libp2p-pubsub v0.3.5 h1:iF75GWpcxKEUQU8tTkgLy69qIQvfhL+t6U6ndQrB6ho=
github.com/libp2p/go-libp2p-pubsub v0.3.3/go.mod h1:DTMSVmZZfXodB/pvdTGrY2eHPZ9W2ev7hzTH83OKHrI= github.com/libp2p/go-libp2p-pubsub v0.3.5/go.mod h1:DTMSVmZZfXodB/pvdTGrY2eHPZ9W2ev7hzTH83OKHrI=
github.com/libp2p/go-libp2p-quic-transport v0.5.0/go.mod h1:IEcuC5MLxvZ5KuHKjRu+dr3LjCT1Be3rcD/4d8JrX8M= github.com/libp2p/go-libp2p-quic-transport v0.5.0/go.mod h1:IEcuC5MLxvZ5KuHKjRu+dr3LjCT1Be3rcD/4d8JrX8M=
github.com/libp2p/go-libp2p-quic-transport v0.7.1 h1:X6Ond9GANspXpgwJlSR9yxcMMD6SLBnGKRtwjBG5awc= github.com/libp2p/go-libp2p-quic-transport v0.7.1 h1:X6Ond9GANspXpgwJlSR9yxcMMD6SLBnGKRtwjBG5awc=
github.com/libp2p/go-libp2p-quic-transport v0.7.1/go.mod h1:TD31to4E5exogR/GWHClXCfkktigjAl5rXSt7HoxNvY= github.com/libp2p/go-libp2p-quic-transport v0.7.1/go.mod h1:TD31to4E5exogR/GWHClXCfkktigjAl5rXSt7HoxNvY=