2023-10-12 12:29:21 -07:00
package ccq
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"time"
"github.com/certusone/wormhole/node/pkg/p2p"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/query"
ethCommon "github.com/ethereum/go-ethereum/common"
ethCrypto "github.com/ethereum/go-ethereum/crypto"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)
type GuardianSignature struct {
Index int
Signature string
}
type SignedResponse struct {
Response * query . QueryResponsePublication
Signatures [ ] GuardianSignature
}
type P2PSub struct {
sub * pubsub . Subscription
topic_req * pubsub . Topic
topic_resp * pubsub . Topic
host host . Host
}
2023-12-21 12:28:15 -08:00
func runP2P ( ctx context . Context , priv crypto . PrivKey , port uint , networkID , bootstrapPeers , ethRpcUrl , ethCoreAddr string , pendingResponses * PendingResponses , logger * zap . Logger , monitorPeers bool ) ( * P2PSub , error ) {
2023-10-12 12:29:21 -07:00
// p2p setup
components := p2p . DefaultComponents ( )
components . Port = port
h , err := p2p . NewHost ( logger , ctx , networkID , bootstrapPeers , components , priv )
if err != nil {
return nil , err
}
topic_req := fmt . Sprintf ( "%s/%s" , networkID , "ccq_req" )
topic_resp := fmt . Sprintf ( "%s/%s" , networkID , "ccq_resp" )
logger . Info ( "Subscribing pubsub topic" , zap . String ( "topic_req" , topic_req ) , zap . String ( "topic_resp" , topic_resp ) )
// Comment from security team in PR #2981: CCQServers should have a parameter of D = 36, Dlo = 19, Dhi = 40, Dout = 18 such that they can reach all Guardians directly.
gossipParams := pubsub . DefaultGossipSubParams ( )
gossipParams . D = 36
gossipParams . Dlo = 19
gossipParams . Dhi = 40
gossipParams . Dout = 18
ps , err := pubsub . NewGossipSub ( ctx , h , pubsub . WithGossipSubParams ( gossipParams ) )
if err != nil {
logger . Error ( "failed to create gossip subscription" , zap . Error ( err ) )
return nil , err
}
th_req , err := ps . Join ( topic_req )
if err != nil {
logger . Error ( "failed to join request topic" , zap . String ( "topic_req" , topic_req ) , zap . Error ( err ) )
return nil , err
}
th_resp , err := ps . Join ( topic_resp )
if err != nil {
logger . Error ( "failed to join response topic" , zap . String ( "topic_resp" , topic_resp ) , zap . Error ( err ) )
return nil , err
}
sub , err := th_resp . Subscribe ( )
if err != nil {
logger . Error ( "failed to subscribe to response topic" , zap . Error ( err ) )
return nil , err
}
logger . Info ( "Node has been started" , zap . String ( "peer_id" , h . ID ( ) . String ( ) ) ,
zap . String ( "addrs" , fmt . Sprintf ( "%v" , h . Addrs ( ) ) ) )
2023-11-09 11:43:48 -08:00
bootstrappers , _ := p2p . BootstrapAddrs ( logger , bootstrapPeers , h . ID ( ) )
successes := p2p . ConnectToPeers ( ctx , logger , h , bootstrappers )
logger . Info ( "Connected to bootstrap peers" , zap . Int ( "num" , successes ) )
2023-10-12 12:29:21 -07:00
// Wait for peers
for len ( th_req . ListPeers ( ) ) < 1 {
time . Sleep ( time . Millisecond * 100 )
}
2023-11-09 11:43:48 -08:00
logger . Info ( "Found peers" , zap . Int ( "numPeers" , len ( th_req . ListPeers ( ) ) ) )
2023-10-12 12:29:21 -07:00
2023-12-21 12:28:15 -08:00
if monitorPeers {
logger . Info ( "Will monitor for missing peers once per minute." )
go func ( ) {
t := time . NewTicker ( time . Minute )
for {
select {
case <- ctx . Done ( ) :
logger . Info ( "Context cancelled, exiting peer monitoring." )
case <- t . C :
peers := th_req . ListPeers ( )
logger . Info ( "current peers" , zap . Int ( "numPeers" , len ( peers ) ) , zap . Any ( "peers" , peers ) )
peerMap := map [ string ] struct { } { }
for _ , peer := range peers {
peerMap [ peer . String ( ) ] = struct { } { }
}
for _ , p := range bootstrappers {
if _ , exists := peerMap [ p . ID . String ( ) ] ; ! exists {
logger . Info ( "attempting to reconnect to peer" , zap . String ( "peer" , p . ID . String ( ) ) )
if err := h . Connect ( ctx , p ) ; err != nil {
logger . Error ( "failed to reconnect to peer" , zap . String ( "peer" , p . ID . String ( ) ) , zap . Error ( err ) )
} else {
logger . Info ( "Reconnected to peer" , zap . String ( "peer" , p . ID . String ( ) ) )
peerMap [ p . ID . String ( ) ] = struct { } { }
successfulReconnects . Inc ( )
}
}
}
}
}
} ( )
}
2023-10-12 12:29:21 -07:00
// Fetch the initial current guardian set
guardianSet , err := FetchCurrentGuardianSet ( ethRpcUrl , ethCoreAddr )
if err != nil {
logger . Fatal ( "Failed to fetch current guardian set" , zap . Error ( err ) )
}
quorum := vaa . CalculateQuorum ( len ( guardianSet . Keys ) )
// Listen to the p2p network for query responses
go func ( ) {
// Maps the request signature to a map of response digests which maps to a list of guardian signatures.
// A request could have responses with different digests, because the guardians could have
// different results returned for the query in the event of a rollback.
responses := make ( map [ string ] map [ ethCommon . Hash ] [ ] GuardianSignature )
for {
envelope , err := sub . Next ( ctx )
if err != nil {
2023-11-09 11:43:48 -08:00
logger . Error ( "Failed to read next pubsub message" , zap . Error ( err ) )
return
2023-10-12 12:29:21 -07:00
}
var msg gossipv1 . GossipMessage
err = proto . Unmarshal ( envelope . Data , & msg )
if err != nil {
2023-11-08 09:22:34 -08:00
logger . Error ( "received invalid message" , zap . Binary ( "data" , envelope . Data ) , zap . String ( "from" , envelope . GetFrom ( ) . String ( ) ) )
inboundP2pError . WithLabelValues ( "failed_to_unmarshal_gossip_msg" ) . Inc ( )
2023-10-12 12:29:21 -07:00
continue
}
switch m := msg . Message . ( type ) {
case * gossipv1 . GossipMessage_SignedQueryResponse :
logger . Debug ( "query response received" , zap . Any ( "response" , m . SignedQueryResponse ) )
2023-11-08 17:47:28 -08:00
peerId := envelope . GetFrom ( ) . String ( )
queryResponsesReceived . WithLabelValues ( peerId ) . Inc ( )
2023-10-12 12:29:21 -07:00
var queryResponse query . QueryResponsePublication
err := queryResponse . Unmarshal ( m . SignedQueryResponse . QueryResponse )
if err != nil {
logger . Error ( "failed to unmarshal response" , zap . Error ( err ) )
2023-11-08 09:22:34 -08:00
inboundP2pError . WithLabelValues ( "failed_to_unmarshal_response" ) . Inc ( )
2023-10-12 12:29:21 -07:00
continue
}
requestSignature := hex . EncodeToString ( queryResponse . Request . Signature )
2023-11-08 17:47:28 -08:00
logger . Info ( "query response received from gossip" , zap . String ( "peerId" , peerId ) , zap . Any ( "requestId" , requestSignature ) )
2023-10-12 12:29:21 -07:00
// Check that we're handling the request for this response
pendingResponse := pendingResponses . Get ( requestSignature )
if pendingResponse == nil {
2023-11-08 09:22:34 -08:00
// This will happen for responses that come in after quorum is reached.
2023-10-12 12:29:21 -07:00
logger . Debug ( "skipping query response for unknown request" , zap . String ( "signature" , requestSignature ) )
continue
}
// Make sure that the request bytes match
if ! bytes . Equal ( queryResponse . Request . QueryRequest , pendingResponse . req . QueryRequest ) ||
! bytes . Equal ( queryResponse . Request . Signature , pendingResponse . req . Signature ) {
continue
}
digest := query . GetQueryResponseDigestFromBytes ( m . SignedQueryResponse . QueryResponse )
signerBytes , err := ethCrypto . Ecrecover ( digest . Bytes ( ) , m . SignedQueryResponse . Signature )
if err != nil {
logger . Error ( "failed to verify signature on response" ,
zap . String ( "digest" , digest . Hex ( ) ) ,
zap . String ( "signature" , hex . EncodeToString ( m . SignedQueryResponse . Signature ) ) ,
zap . Error ( err ) )
2023-11-08 09:22:34 -08:00
inboundP2pError . WithLabelValues ( "failed_to_verify_signature" ) . Inc ( )
2023-10-12 12:29:21 -07:00
continue
}
signerAddress := ethCommon . BytesToAddress ( ethCrypto . Keccak256 ( signerBytes [ 1 : ] ) [ 12 : ] )
keyIdx , hasKeyIdx := guardianSet . KeyIndex ( signerAddress )
if hasKeyIdx {
if _ , ok := responses [ requestSignature ] ; ! ok {
responses [ requestSignature ] = make ( map [ ethCommon . Hash ] [ ] GuardianSignature )
}
found := false
for _ , gs := range responses [ requestSignature ] [ digest ] {
if gs . Index == keyIdx {
found = true
break
}
}
if found {
// Already handled the response from this guardian
continue
}
responses [ requestSignature ] [ digest ] = append ( responses [ requestSignature ] [ digest ] , GuardianSignature {
Index : keyIdx ,
Signature : hex . EncodeToString ( m . SignedQueryResponse . Signature ) ,
} )
// quorum is reached when a super-majority of guardians have signed a response with the same digest
2023-11-09 11:43:48 -08:00
numSigners := len ( responses [ requestSignature ] [ digest ] )
if numSigners >= quorum {
2023-10-12 12:29:21 -07:00
s := & SignedResponse {
Response : & queryResponse ,
Signatures : responses [ requestSignature ] [ digest ] ,
}
delete ( responses , requestSignature )
2023-11-06 12:16:43 -08:00
select {
case pendingResponse . ch <- s :
2023-11-08 17:47:28 -08:00
logger . Info ( "forwarded query response" ,
zap . String ( "peerId" , peerId ) ,
zap . Any ( "requestId" , requestSignature ) ,
2023-11-09 11:43:48 -08:00
zap . Int ( "numSigners" , numSigners ) ,
2023-11-08 17:47:28 -08:00
zap . Int ( "quorum" , quorum ) ,
)
2023-11-06 12:16:43 -08:00
default :
2023-11-08 17:47:28 -08:00
logger . Error ( "failed to write query response to channel, dropping it" , zap . String ( "peerId" , peerId ) , zap . Any ( "requestId" , requestSignature ) )
2023-12-11 14:00:46 -08:00
// Leave the request in the pending map. It will get cleaned up if it times out.
2023-11-06 12:16:43 -08:00
}
2023-11-08 17:47:28 -08:00
} else {
logger . Info ( "waiting for more query responses" ,
zap . String ( "peerId" , peerId ) ,
zap . Any ( "requestId" , requestSignature ) ,
2023-11-09 11:43:48 -08:00
zap . Int ( "numSigners" , numSigners ) ,
2023-11-08 17:47:28 -08:00
zap . Int ( "quorum" , quorum ) ,
)
2023-10-12 12:29:21 -07:00
}
} else {
logger . Warn ( "received observation by unknown guardian - is our guardian set outdated?" ,
zap . String ( "digest" , digest . Hex ( ) ) , zap . String ( "address" , signerAddress . Hex ( ) ) ,
)
2023-11-08 09:22:34 -08:00
inboundP2pError . WithLabelValues ( "unknown_guardian" ) . Inc ( )
2023-10-12 12:29:21 -07:00
}
2023-11-08 09:22:34 -08:00
default :
// Since CCQ gossip is isolated, this really shouldn't happen.
logger . Debug ( "unexpected gossip message type" , zap . Any ( "msg" , m ) )
inboundP2pError . WithLabelValues ( "unexpected_gossip_msg_type" ) . Inc ( )
2023-10-12 12:29:21 -07:00
}
}
} ( )
return & P2PSub {
sub : sub ,
topic_req : th_req ,
topic_resp : th_resp ,
host : h ,
} , nil
}