ccq: better proxy peering

This commit is contained in:
Evan Gray 2023-11-09 14:43:48 -05:00 committed by Evan Gray
parent 4bfcb89d4c
commit 22bf5f6987
5 changed files with 32 additions and 14 deletions

View File

@ -86,10 +86,15 @@ func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, boot
logger.Info("Node has been started", zap.String("peer_id", h.ID().String()),
zap.String("addrs", fmt.Sprintf("%v", h.Addrs())))
bootstrappers, _ := p2p.BootstrapAddrs(logger, bootstrapPeers, h.ID())
successes := p2p.ConnectToPeers(ctx, logger, h, bootstrappers)
logger.Info("Connected to bootstrap peers", zap.Int("num", successes))
// Wait for peers
for len(th_req.ListPeers()) < 1 {
time.Sleep(time.Millisecond * 100)
}
logger.Info("Found peers", zap.Int("numPeers", len(th_req.ListPeers())))
// Fetch the initial current guardian set
guardianSet, err := FetchCurrentGuardianSet(ethRpcUrl, ethCoreAddr)
@ -107,7 +112,8 @@ func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, boot
for {
envelope, err := sub.Next(ctx)
if err != nil {
logger.Fatal("Failed to read next pubsub message", zap.Error(err))
logger.Error("Failed to read next pubsub message", zap.Error(err))
return
}
var msg gossipv1.GossipMessage
err = proto.Unmarshal(envelope.Data, &msg)
@ -175,7 +181,8 @@ func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, boot
Signature: hex.EncodeToString(m.SignedQueryResponse.Signature),
})
// quorum is reached when a super-majority of guardians have signed a response with the same digest
if len(responses[requestSignature][digest]) >= quorum {
numSigners := len(responses[requestSignature][digest])
if numSigners >= quorum {
s := &SignedResponse{
Response: &queryResponse,
Signatures: responses[requestSignature][digest],
@ -186,7 +193,7 @@ func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, boot
logger.Info("forwarded query response",
zap.String("peerId", peerId),
zap.Any("requestId", requestSignature),
zap.Int("numSigners", len(responses[requestSignature][digest])),
zap.Int("numSigners", numSigners),
zap.Int("quorum", quorum),
)
default:
@ -196,7 +203,7 @@ func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, boot
logger.Info("waiting for more query responses",
zap.String("peerId", peerId),
zap.Any("requestId", requestSignature),
zap.Int("numSigners", len(responses[requestSignature][digest])),
zap.Int("numSigners", numSigners),
zap.Int("quorum", quorum),
)
}

View File

@ -9,6 +9,8 @@ import (
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/telemetry"
@ -180,6 +182,15 @@ func runQueryServer(cmd *cobra.Command, args []string) {
}()
}
// Handle SIGTERM
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGTERM)
go func() {
<-sigterm
logger.Info("Received sigterm. exiting.")
cancel()
}()
<-ctx.Done()
logger.Info("Context cancelled, exiting...")

View File

@ -102,7 +102,7 @@ func (ccq *ccqP2p) run(
if _, found := ccq.allowedPeers[peerID.String()]; found {
return true
}
ccq.logger.Debug("Dropping subscribe attempt from unknown peer", zap.String("peerID", peerID.String()))
ccq.logger.Info("Dropping subscribe attempt from unknown peer", zap.String("peerID", peerID.String()))
return false
}))
if err != nil {
@ -129,7 +129,7 @@ func (ccq *ccqP2p) run(
if _, found := ccq.allowedPeers[from.String()]; found {
return true
}
ccq.logger.Debug("Dropping message from unknown peer", zap.String("fromPeerID", from.String()))
ccq.logger.Info("Dropping message from unknown peer", zap.String("fromPeerID", from.String()))
return false
})
if err != nil {

View File

@ -20,7 +20,7 @@ func TestMain(m *testing.M) {
func TestCutOverBootstrapAddrs(t *testing.T) {
logger, _ := zap.NewDevelopment()
bootstrappers, isBootstrapNode := bootstrapAddrs(logger, oldBootstrapPeers, "12D3KooWHHzSeKaY8xuZVzkLbKFfvNgPPeKhFBGrMbNzbm5akpqu")
bootstrappers, isBootstrapNode := BootstrapAddrs(logger, oldBootstrapPeers, "12D3KooWHHzSeKaY8xuZVzkLbKFfvNgPPeKhFBGrMbNzbm5akpqu")
assert.Equal(t, 2, len(bootstrappers))
assert.False(t, isBootstrapNode)
for _, ba := range bootstrappers {

View File

@ -150,9 +150,9 @@ func DefaultConnectionManager() (*connmgr.BasicConnMgr, error) {
)
}
// bootstrapAddrs takes a comma-separated string of multi-address strings and returns an array of []peer.AddrInfo that does not include `self`.
// BootstrapAddrs takes a comma-separated string of multi-address strings and returns an array of []peer.AddrInfo that does not include `self`.
// if `self` is part of `bootstrapPeers`, return isBootstrapNode=true
func bootstrapAddrs(logger *zap.Logger, bootstrapPeers string, self peer.ID) (bootstrappers []peer.AddrInfo, isBootstrapNode bool) {
func BootstrapAddrs(logger *zap.Logger, bootstrapPeers string, self peer.ID) (bootstrappers []peer.AddrInfo, isBootstrapNode bool) {
bootstrapPeers = cutOverBootstrapPeers(bootstrapPeers)
bootstrappers = make([]peer.AddrInfo, 0)
for _, addr := range strings.Split(bootstrapPeers, ",") {
@ -179,8 +179,8 @@ func bootstrapAddrs(logger *zap.Logger, bootstrapPeers string, self peer.ID) (bo
return
}
// connectToPeers connects `h` to `peers` and returns the number of successful connections.
func connectToPeers(ctx context.Context, logger *zap.Logger, h host.Host, peers []peer.AddrInfo) (successes int) {
// ConnectToPeers connects `h` to `peers` and returns the number of successful connections.
func ConnectToPeers(ctx context.Context, logger *zap.Logger, h host.Host, peers []peer.AddrInfo) (successes int) {
successes = 0
for _, p := range peers {
if err := h.Connect(ctx, p); err != nil {
@ -219,7 +219,7 @@ func NewHost(logger *zap.Logger, ctx context.Context, networkID string, bootstra
libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
logger.Info("Connecting to bootstrap peers", zap.String("bootstrap_peers", bootstrapPeers))
bootstrappers, _ := bootstrapAddrs(logger, bootstrapPeers, h.ID())
bootstrappers, _ := BootstrapAddrs(logger, bootstrapPeers, h.ID())
// TODO(leo): Persistent data store (i.e. address book)
idht, err := dht.New(ctx, h, dht.Mode(dht.ModeServer),
@ -298,7 +298,7 @@ func Run(
topic := fmt.Sprintf("%s/%s", networkID, "broadcast")
bootstrappers, bootstrapNode := bootstrapAddrs(logger, bootstrapPeers, h.ID())
bootstrappers, bootstrapNode := BootstrapAddrs(logger, bootstrapPeers, h.ID())
gossipParams := pubsub.DefaultGossipSubParams()
if bootstrapNode {
@ -340,7 +340,7 @@ func Run(
// Make sure we connect to at least 1 bootstrap node (this is particularly important in a local devnet and CI
// as peer discovery can take a long time).
successes := connectToPeers(ctx, logger, h, bootstrappers)
successes := ConnectToPeers(ctx, logger, h, bootstrappers)
if successes == 0 && !bootstrapNode { // If we're a bootstrap node it's okay to not have any peers.
// If we fail to connect to any bootstrap peer, kill the service