CCQ: Query Server (#3422)

* CCQ: Query Server

* More rework

* Clean up p2p code

* Health check change

* node: support http PUT or POST for new ccq queries

---------

Co-authored-by: Jeff Schroeder <jeffschroeder@computer.org>
This commit is contained in:
bruce-riley 2023-10-12 14:29:21 -05:00 committed by GitHub
parent 669e2bc40e
commit ad696d2f3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 1276 additions and 0 deletions

60
devnet/query-server.yaml Normal file
View File

@ -0,0 +1,60 @@
apiVersion: v1
kind: Service
metadata:
name: query-server
labels:
app: query-server
spec:
ports:
- name: rest
port: 6069
protocol: TCP
selector:
app: query-server
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: query-server
spec:
selector:
matchLabels:
app: query-server
serviceName: query-server
replicas: 1
template:
metadata:
labels:
app: query-server
spec:
containers:
- name: query-server
image: guardiand-image
command:
- /guardiand
- query-server
- --env
- dev
- --nodeKey
- node/cmd/ccq/ccq.p2p.key
- --signerKey
- node/cmd/ccq/ccq.signing.key
- --listenAddr
- "[::]:6069"
- --permFile
- "node/cmd/ccq/devnet.config.json"
- --ethRPC
- http://eth-devnet:8545
- --ethContract
- "0xC89Ce4735882C9F0f0FE26686c53074E09B0D550"
# Hardcoded devnet bootstrap (generated from deterministic key in guardiand)
- --bootstrap
- /dns4/guardian-0.guardian/udp/8996/quic/p2p/12D3KooWL3XJ9EMCyZvmmGXL2LMiVBtrVa2BuESsJiXkSj7333Jw
- --logLevel=info
ports:
- containerPort: 6069
name: rest
protocol: TCP
readinessProbe:
tcpSocket:
port: rest

1
node/cmd/ccq/ccq.p2p.key Normal file
View File

@ -0,0 +1 @@
@~D&ðo%†[‰_jäÖszf=5´¨÷šfæC„d ¸Ó¥§“§91)„¬¡U£=˜Ö¿@JxÓ¼l ]aŒÀf?

View File

@ -0,0 +1,6 @@
-----BEGIN CCQ SERVER SIGNING KEY-----
PublicKey: 0x25021A4FCAf61F2EADC8202D3833Df48B2Fa0D54
CiCWNLSaicmcA2T563fLSM0r2uFviwPdA1VV9i76DlJh3Q==
=NV1/
-----END CCQ SERVER SIGNING KEY-----

View File

@ -0,0 +1,65 @@
{
"permissions": [
{
"userName": "Test User",
"apiKey": "my_secret_key",
"allowedCalls": [
{
"ethCall": {
"note:": "Name of WETH on Goerli",
"chain": 2,
"contractAddress": "B4FBF271143F4FBf7B91A5ded31805e42b2208d6",
"call": "0x06fdde03"
}
},
{
"ethCall": {
"note:": "Total supply of WETH on Goerli",
"chain": 2,
"contractAddress": "0xB4FBF271143F4FBf7B91A5ded31805e42b2208d6",
"call": "0x18160ddd"
}
},
{
"ethCall": {
"note:": "Name of WETH on Devnet",
"chain": 2,
"contractAddress": "0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E",
"call": "0x06fdde03"
}
},
{
"ethCall": {
"note:": "Total supply of WETH on Devnet",
"chain": 2,
"contractAddress": "0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E",
"call": "0x18160ddd"
}
}
]
},
{
"userName": "Test User Two",
"apiKey": "my_secret_key_2",
"allowUnsigned": true,
"allowedCalls": [
{
"ethCall": {
"note:": "Name of WETH on Goerli",
"chain": 2,
"contractAddress": "0xB4FBF271143F4FBf7B91A5ded31805e42b2208d6",
"call": "0x06fdde03"
}
},
{
"ethCall": {
"note:": "Name of WETH on Devnet",
"chain": 2,
"contractAddress": "0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E",
"call": "0x06fdde03"
}
}
]
}
]
}

179
node/cmd/ccq/http.go Normal file
View File

@ -0,0 +1,179 @@
package ccq
import (
"crypto/ecdsa"
"encoding/hex"
"encoding/json"
"fmt"
"net/http"
"sort"
"time"
"github.com/certusone/wormhole/node/pkg/common"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/query"
"github.com/gorilla/mux"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)
type queryRequest struct {
Bytes string `json:"bytes"`
Signature string `json:"signature"`
}
type queryResponse struct {
Bytes string `json:"bytes"`
Signatures []string `json:"signatures"`
}
type httpServer struct {
topic *pubsub.Topic
logger *zap.Logger
env common.Environment
permissions Permissions
signerKey *ecdsa.PrivateKey
pendingResponses *PendingResponses
}
func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) {
// Set CORS headers for all requests.
w.Header().Set("Access-Control-Allow-Origin", "*")
// Set CORS headers for the preflight request
if r.Method == http.MethodOptions {
w.Header().Set("Access-Control-Allow-Methods", "PUT, POST")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, X-Api-Key")
w.Header().Set("Access-Control-Max-Age", "3600")
w.WriteHeader(http.StatusNoContent)
return
}
var q queryRequest
err := json.NewDecoder(r.Body).Decode(&q)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// There should be one and only one API key in the header.
apiKey, exists := r.Header["X-Api-Key"]
if !exists || len(apiKey) != 1 {
s.logger.Debug("received a request without an api key", zap.Stringer("url", r.URL), zap.Error(err))
http.Error(w, "api key is missing", http.StatusUnauthorized)
return
}
queryRequestBytes, err := hex.DecodeString(q.Bytes)
if err != nil {
s.logger.Debug("failed to decode request bytes", zap.Error(err))
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
signature, err := hex.DecodeString(q.Signature)
if err != nil {
s.logger.Debug("failed to decode signature bytes", zap.Error(err))
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
signedQueryRequest := &gossipv1.SignedQueryRequest{
QueryRequest: queryRequestBytes,
Signature: signature,
}
if status, err := validateRequest(s.logger, s.env, s.permissions, s.signerKey, apiKey[0], signedQueryRequest); err != nil {
// Don't need to log here because the details were logged in the function.
http.Error(w, err.Error(), status)
return
}
m := gossipv1.GossipMessage{
Message: &gossipv1.GossipMessage_SignedQueryRequest{
SignedQueryRequest: signedQueryRequest,
},
}
b, err := proto.Marshal(&m)
if err != nil {
s.logger.Error("failed to marshal gossip message", zap.Error(err))
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
pendingResponse := NewPendingResponse(signedQueryRequest)
added := s.pendingResponses.Add(pendingResponse)
if !added {
http.Error(w, "Duplicate request", http.StatusBadRequest)
return
}
err = s.topic.Publish(r.Context(), b)
if err != nil {
s.logger.Error("failed to publish gossip message", zap.Error(err))
http.Error(w, err.Error(), http.StatusInternalServerError)
s.pendingResponses.Remove(pendingResponse)
return
}
// Wait for the response or timeout
select {
case <-time.After(query.RequestTimeout + 5*time.Second):
http.Error(w, "Timed out waiting for response", http.StatusGatewayTimeout)
case res := <-pendingResponse.ch:
resBytes, err := res.Response.Marshal()
if err != nil {
s.logger.Error("failed to marshal response", zap.Error(err))
http.Error(w, err.Error(), http.StatusInternalServerError)
break
}
// Signature indices must be ascending for on-chain verification
sort.Slice(res.Signatures, func(i, j int) bool {
return res.Signatures[i].Index < res.Signatures[j].Index
})
signatures := make([]string, 0, len(res.Signatures))
for _, s := range res.Signatures {
// ECDSA signature + a byte for the index of the guardian in the guardian set
signature := fmt.Sprintf("%s%02x", s.Signature, uint8(s.Index))
signatures = append(signatures, signature)
}
w.Header().Add("Content-Type", "application/json")
err = json.NewEncoder(w).Encode(&queryResponse{
Signatures: signatures,
Bytes: hex.EncodeToString(resBytes),
})
if err != nil {
s.logger.Error("failed to encode response", zap.Error(err))
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
s.pendingResponses.Remove(pendingResponse)
}
func (s *httpServer) handleHealth(w http.ResponseWriter, r *http.Request) {
s.logger.Debug("health check")
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "ok")
}
func NewHTTPServer(addr string, t *pubsub.Topic, permissions Permissions, signerKey *ecdsa.PrivateKey, p *PendingResponses, logger *zap.Logger, env common.Environment) *http.Server {
s := &httpServer{
topic: t,
permissions: permissions,
signerKey: signerKey,
pendingResponses: p,
logger: logger,
env: env,
}
r := mux.NewRouter()
r.HandleFunc("/v1/query", s.handleQuery).Methods("PUT", "POST", "OPTIONS")
r.HandleFunc("/v1/health", s.handleHealth).Methods("GET")
return &http.Server{
Addr: addr,
Handler: r,
ReadHeaderTimeout: 5 * time.Second,
}
}

195
node/cmd/ccq/p2p.go Normal file
View File

@ -0,0 +1,195 @@
package ccq
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"time"
"github.com/certusone/wormhole/node/pkg/p2p"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/query"
ethCommon "github.com/ethereum/go-ethereum/common"
ethCrypto "github.com/ethereum/go-ethereum/crypto"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)
type GuardianSignature struct {
Index int
Signature string
}
type SignedResponse struct {
Response *query.QueryResponsePublication
Signatures []GuardianSignature
}
type P2PSub struct {
sub *pubsub.Subscription
topic_req *pubsub.Topic
topic_resp *pubsub.Topic
host host.Host
}
func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, bootstrapPeers, ethRpcUrl, ethCoreAddr string, pendingResponses *PendingResponses, logger *zap.Logger) (*P2PSub, error) {
// p2p setup
components := p2p.DefaultComponents()
components.Port = port
h, err := p2p.NewHost(logger, ctx, networkID, bootstrapPeers, components, priv)
if err != nil {
return nil, err
}
topic_req := fmt.Sprintf("%s/%s", networkID, "ccq_req")
topic_resp := fmt.Sprintf("%s/%s", networkID, "ccq_resp")
logger.Info("Subscribing pubsub topic", zap.String("topic_req", topic_req), zap.String("topic_resp", topic_resp))
// Comment from security team in PR #2981: CCQServers should have a parameter of D = 36, Dlo = 19, Dhi = 40, Dout = 18 such that they can reach all Guardians directly.
gossipParams := pubsub.DefaultGossipSubParams()
gossipParams.D = 36
gossipParams.Dlo = 19
gossipParams.Dhi = 40
gossipParams.Dout = 18
ps, err := pubsub.NewGossipSub(ctx, h, pubsub.WithGossipSubParams(gossipParams))
if err != nil {
logger.Error("failed to create gossip subscription", zap.Error(err))
return nil, err
}
th_req, err := ps.Join(topic_req)
if err != nil {
logger.Error("failed to join request topic", zap.String("topic_req", topic_req), zap.Error(err))
return nil, err
}
th_resp, err := ps.Join(topic_resp)
if err != nil {
logger.Error("failed to join response topic", zap.String("topic_resp", topic_resp), zap.Error(err))
return nil, err
}
sub, err := th_resp.Subscribe()
if err != nil {
logger.Error("failed to subscribe to response topic", zap.Error(err))
return nil, err
}
logger.Info("Node has been started", zap.String("peer_id", h.ID().String()),
zap.String("addrs", fmt.Sprintf("%v", h.Addrs())))
// Wait for peers
for len(th_req.ListPeers()) < 1 {
time.Sleep(time.Millisecond * 100)
}
// Fetch the initial current guardian set
guardianSet, err := FetchCurrentGuardianSet(ethRpcUrl, ethCoreAddr)
if err != nil {
logger.Fatal("Failed to fetch current guardian set", zap.Error(err))
}
quorum := vaa.CalculateQuorum(len(guardianSet.Keys))
// Listen to the p2p network for query responses
go func() {
// Maps the request signature to a map of response digests which maps to a list of guardian signatures.
// A request could have responses with different digests, because the guardians could have
// different results returned for the query in the event of a rollback.
responses := make(map[string]map[ethCommon.Hash][]GuardianSignature)
for {
envelope, err := sub.Next(ctx)
if err != nil {
logger.Fatal("Failed to read next pubsub message", zap.Error(err))
}
var msg gossipv1.GossipMessage
err = proto.Unmarshal(envelope.Data, &msg)
if err != nil {
logger.Error("received invalid message", zap.Binary("data", envelope.Data),
zap.String("from", envelope.GetFrom().String()))
continue
}
switch m := msg.Message.(type) {
case *gossipv1.GossipMessage_SignedQueryResponse:
logger.Debug("query response received", zap.Any("response", m.SignedQueryResponse))
var queryResponse query.QueryResponsePublication
err := queryResponse.Unmarshal(m.SignedQueryResponse.QueryResponse)
if err != nil {
logger.Error("failed to unmarshal response", zap.Error(err))
continue
}
requestSignature := hex.EncodeToString(queryResponse.Request.Signature)
// Check that we're handling the request for this response
pendingResponse := pendingResponses.Get(requestSignature)
if pendingResponse == nil {
logger.Debug("skipping query response for unknown request", zap.String("signature", requestSignature))
continue
}
// Make sure that the request bytes match
if !bytes.Equal(queryResponse.Request.QueryRequest, pendingResponse.req.QueryRequest) ||
!bytes.Equal(queryResponse.Request.Signature, pendingResponse.req.Signature) {
continue
}
digest := query.GetQueryResponseDigestFromBytes(m.SignedQueryResponse.QueryResponse)
signerBytes, err := ethCrypto.Ecrecover(digest.Bytes(), m.SignedQueryResponse.Signature)
if err != nil {
logger.Error("failed to verify signature on response",
zap.String("digest", digest.Hex()),
zap.String("signature", hex.EncodeToString(m.SignedQueryResponse.Signature)),
zap.Error(err))
continue
}
signerAddress := ethCommon.BytesToAddress(ethCrypto.Keccak256(signerBytes[1:])[12:])
keyIdx, hasKeyIdx := guardianSet.KeyIndex(signerAddress)
if hasKeyIdx {
if _, ok := responses[requestSignature]; !ok {
responses[requestSignature] = make(map[ethCommon.Hash][]GuardianSignature)
}
found := false
for _, gs := range responses[requestSignature][digest] {
if gs.Index == keyIdx {
found = true
break
}
}
if found {
// Already handled the response from this guardian
continue
}
responses[requestSignature][digest] = append(responses[requestSignature][digest], GuardianSignature{
Index: keyIdx,
Signature: hex.EncodeToString(m.SignedQueryResponse.Signature),
})
// quorum is reached when a super-majority of guardians have signed a response with the same digest
if len(responses[requestSignature][digest]) >= quorum {
s := &SignedResponse{
Response: &queryResponse,
Signatures: responses[requestSignature][digest],
}
delete(responses, requestSignature)
pendingResponse.ch <- s
}
} else {
logger.Warn("received observation by unknown guardian - is our guardian set outdated?",
zap.String("digest", digest.Hex()), zap.String("address", signerAddress.Hex()),
)
}
}
}
}()
return &P2PSub{
sub: sub,
topic_req: th_req,
topic_resp: th_resp,
host: h,
}, nil
}

View File

@ -0,0 +1,273 @@
package ccq
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestParseConfigFileDoesntExist(t *testing.T) {
_, err := parseConfigFile("missingFile.json")
require.Error(t, err)
assert.Equal(t, `failed to open permissions file "missingFile.json": open missingFile.json: no such file or directory`, err.Error())
}
func TestParseConfigBadJson(t *testing.T) {
str := `
{
"permissions": [
{
"userName": "Test User",
"apiKey": "my_secret_key",
"allowedCalls": [
{
"ethCall": {
"note:": "Name of WETH on Goerli",
"chain": 2,
"contractAddress": "B4FBF271143F4FBf7B91A5ded31805e42b2208d6",
"call": "0x06fdde03"
}
}
]
},
{
"userName": "Test User",
"apiKey": "my_secret_key_2",
"allowUnsigned": true,
"allowedCalls": [
{
"ethCall": {
"note:": "Name of WETH on Goerli",
"chain": 2,
"contractAddress": "0xB4FBF271143F4FBf7B91A5ded31805e42b2208d6",
"call": "0x06fdde03"
}
}
]
}`
_, err := parseConfig([]byte(str))
require.Error(t, err)
assert.Equal(t, `failed to unmarshal json: unexpected end of JSON input`, err.Error())
}
func TestParseConfigDuplicateUser(t *testing.T) {
str := `
{
"permissions": [
{
"userName": "Test User",
"apiKey": "my_secret_key",
"allowedCalls": [
{
"ethCall": {
"note:": "Name of WETH on Goerli",
"chain": 2,
"contractAddress": "B4FBF271143F4FBf7B91A5ded31805e42b2208d6",
"call": "0x06fdde03"
}
}
]
},
{
"userName": "Test User",
"apiKey": "my_secret_key_2",
"allowUnsigned": true,
"allowedCalls": [
{
"ethCall": {
"note:": "Name of WETH on Goerli",
"chain": 2,
"contractAddress": "0xB4FBF271143F4FBf7B91A5ded31805e42b2208d6",
"call": "0x06fdde03"
}
}
]
}
]
}`
_, err := parseConfig([]byte(str))
require.Error(t, err)
assert.Equal(t, `UserName "Test User" is a duplicate`, err.Error())
}
func TestParseConfigDuplicateApiKey(t *testing.T) {
str := `
{
"permissions": [
{
"userName": "Test User 1",
"apiKey": "my_secret_key",
"allowedCalls": [
{
"ethCall": {
"note:": "Name of WETH on Goerli",
"chain": 2,
"contractAddress": "B4FBF271143F4FBf7B91A5ded31805e42b2208d6",
"call": "0x06fdde03"
}
}
]
},
{
"userName": "Test User 2",
"apiKey": "my_secret_key",
"allowUnsigned": true,
"allowedCalls": [
{
"ethCall": {
"note:": "Name of WETH on Goerli",
"chain": 2,
"contractAddress": "0xB4FBF271143F4FBf7B91A5ded31805e42b2208d6",
"call": "0x06fdde03"
}
}
]
}
]
}`
_, err := parseConfig([]byte(str))
require.Error(t, err)
assert.Equal(t, `API key "my_secret_key" is a duplicate`, err.Error())
}
func TestParseConfigUnsupportedCallType(t *testing.T) {
str := `
{
"permissions": [
{
"userName": "Test User",
"apiKey": "my_secret_key",
"allowedCalls": [
{
"badCallType": {
"note:": "Name of WETH on Goerli",
"chain": 2,
"contractAddress": "B4FBF271143F4FBf7B91A5ded31805e42b2208d6",
"call": "0x06fdde03"
}
}
]
}
]
}`
_, err := parseConfig([]byte(str))
require.Error(t, err)
assert.Equal(t, `unsupported call type for user "Test User", must be "ethCall"`, err.Error())
}
func TestParseConfigInvalidContractAddress(t *testing.T) {
str := `
{
"permissions": [
{
"userName": "Test User",
"apiKey": "my_secret_key",
"allowedCalls": [
{
"ethCall": {
"note:": "Name of WETH on Goerli",
"chain": 2,
"contractAddress": "HelloWorld",
"call": "0x06fdde"
}
}
]
}
]
}`
_, err := parseConfig([]byte(str))
require.Error(t, err)
assert.Equal(t, `invalid contract address "HelloWorld" for user "Test User"`, err.Error())
}
func TestParseConfigInvalidEthCall(t *testing.T) {
str := `
{
"permissions": [
{
"userName": "Test User",
"apiKey": "my_secret_key",
"allowedCalls": [
{
"ethCall": {
"note:": "Name of WETH on Goerli",
"chain": 2,
"contractAddress": "B4FBF271143F4FBf7B91A5ded31805e42b2208d6",
"call": "HelloWorld"
}
}
]
}
]
}`
_, err := parseConfig([]byte(str))
require.Error(t, err)
assert.Equal(t, `invalid eth call "HelloWorld" for user "Test User"`, err.Error())
}
func TestParseConfigInvalidEthCallLength(t *testing.T) {
str := `
{
"permissions": [
{
"userName": "Test User",
"apiKey": "my_secret_key",
"allowedCalls": [
{
"ethCall": {
"note:": "Name of WETH on Goerli",
"chain": 2,
"contractAddress": "B4FBF271143F4FBf7B91A5ded31805e42b2208d6",
"call": "0x06fd"
}
}
]
}
]
}`
_, err := parseConfig([]byte(str))
require.Error(t, err)
assert.Equal(t, `eth call "0x06fd" for user "Test User" has an invalid length, must be 4 bytes`, err.Error())
}
func TestParseConfigDuplicateAllowedCallForUser(t *testing.T) {
str := `
{
"permissions": [
{
"userName": "Test User",
"apiKey": "my_secret_key",
"allowedCalls": [
{
"ethCall": {
"note:": "Name of WETH on Goerli",
"chain": 2,
"contractAddress": "B4FBF271143F4FBf7B91A5ded31805e42b2208d6",
"call": "0x06fdde03"
}
},
{
"ethCall": {
"note:": "Name of WETH on Goerli",
"chain": 2,
"contractAddress": "B4FBF271143F4FBf7B91A5ded31805e42b2208d6",
"call": "0x06fdde03"
}
}
]
}
]
}`
_, err := parseConfig([]byte(str))
require.Error(t, err)
assert.Equal(t, `"ethCall:2:000000000000000000000000b4fbf271143f4fbf7b91a5ded31805e42b2208d6:06fdde03" is a duplicate allowed call for user "Test User"`, err.Error())
}

View File

@ -0,0 +1,60 @@
package ccq
import (
"encoding/hex"
"sync"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
)
type PendingResponse struct {
req *gossipv1.SignedQueryRequest
ch chan *SignedResponse
}
func NewPendingResponse(req *gossipv1.SignedQueryRequest) *PendingResponse {
return &PendingResponse{
req: req,
ch: make(chan *SignedResponse),
}
}
type PendingResponses struct {
pendingResponses map[string]*PendingResponse
mu sync.RWMutex
}
func NewPendingResponses() *PendingResponses {
return &PendingResponses{
pendingResponses: make(map[string]*PendingResponse),
}
}
func (p *PendingResponses) Add(r *PendingResponse) bool {
signature := hex.EncodeToString(r.req.Signature)
p.mu.Lock()
defer p.mu.Unlock()
if _, ok := p.pendingResponses[signature]; ok {
// the request w/ this signature is already being handled
// don't overwrite
return false
}
p.pendingResponses[signature] = r
return true
}
func (p *PendingResponses) Get(signature string) *PendingResponse {
p.mu.RLock()
defer p.mu.RUnlock()
if r, ok := p.pendingResponses[signature]; ok {
return r
}
return nil
}
func (p *PendingResponses) Remove(r *PendingResponse) {
signature := hex.EncodeToString(r.req.Signature)
p.mu.Lock()
defer p.mu.Unlock()
delete(p.pendingResponses, signature)
}

View File

@ -0,0 +1,184 @@
// Note: To generate a signer key file do: guardiand keygen --block-type "CCQ SERVER SIGNING KEY" /path/to/key/file
// You will need to add this key to ccqAllowedRequesters in the guardian configs.
package ccq
import (
"context"
"crypto/ecdsa"
"fmt"
"net/http"
"os"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/telemetry"
"github.com/certusone/wormhole/node/pkg/version"
ethCrypto "github.com/ethereum/go-ethereum/crypto"
ipfslog "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/spf13/cobra"
"go.uber.org/zap"
)
const CCQ_SERVER_SIGNING_KEY = "CCQ SERVER SIGNING KEY"
var (
envStr *string
p2pNetworkID *string
p2pPort *uint
p2pBootstrap *string
listenAddr *string
nodeKeyPath *string
signerKeyPath *string
permFile *string
ethRPC *string
ethContract *string
logLevel *string
telemetryLokiURL *string
telemetryNodeName *string
)
const DEV_NETWORK_ID = "/wormhole/dev"
func init() {
envStr = QueryServerCmd.Flags().String("env", "", "environment (dev, test, prod)")
p2pNetworkID = QueryServerCmd.Flags().String("network", DEV_NETWORK_ID, "P2P network identifier")
p2pPort = QueryServerCmd.Flags().Uint("port", 8995, "P2P UDP listener port")
p2pBootstrap = QueryServerCmd.Flags().String("bootstrap", "", "P2P bootstrap peers (comma-separated)")
nodeKeyPath = QueryServerCmd.Flags().String("nodeKey", "", "Path to node key (will be generated if it doesn't exist)")
signerKeyPath = QueryServerCmd.Flags().String("signerKey", "", "Path to key used to sign unsigned queries")
listenAddr = QueryServerCmd.Flags().String("listenAddr", "[::]:6069", "Listen address for query server (disabled if blank)")
permFile = QueryServerCmd.Flags().String("permFile", "", "JSON file containing permissions configuration")
ethRPC = QueryServerCmd.Flags().String("ethRPC", "", "Ethereum RPC for fetching current guardian set")
ethContract = QueryServerCmd.Flags().String("ethContract", "", "Ethereum core bridge address for fetching current guardian set")
logLevel = QueryServerCmd.Flags().String("logLevel", "info", "Logging level (debug, info, warn, error, dpanic, panic, fatal)")
telemetryLokiURL = QueryServerCmd.Flags().String("telemetryLokiURL", "", "Loki cloud logging URL")
telemetryNodeName = QueryServerCmd.Flags().String("telemetryNodeName", "", "Node name used in telemetry")
}
var QueryServerCmd = &cobra.Command{
Use: "query-server",
Short: "Run the cross-chain query server",
Run: runQueryServer,
}
func runQueryServer(cmd *cobra.Command, args []string) {
common.SetRestrictiveUmask()
// Setup logging
lvl, err := ipfslog.LevelFromString(*logLevel)
if err != nil {
fmt.Println("Invalid log level")
os.Exit(1)
}
logger := ipfslog.Logger("query-server").Desugar()
ipfslog.SetAllLoggers(lvl)
if *telemetryLokiURL != "" {
logger.Info("Using Loki telemetry logger")
if *telemetryNodeName == "" {
logger.Fatal("if --telemetryLokiURL is specified --telemetryNodeName must be specified")
}
labels := map[string]string{
"network": *p2pNetworkID,
"node_name": *telemetryNodeName,
"version": version.Version(),
}
tm, err := telemetry.NewLokiCloudLogger(context.Background(), logger, *telemetryLokiURL, "ccq_server", true, labels)
if err != nil {
logger.Fatal("Failed to initialize telemetry", zap.Error(err))
}
defer tm.Close()
logger = tm.WrapLogger(logger) // Wrap logger with telemetry logger
}
env, err := common.ParseEnvironment(*envStr)
if err != nil || (env != common.UnsafeDevNet && env != common.TestNet && env != common.MainNet) {
if *envStr == "" {
logger.Fatal("Please specify --env")
}
logger.Fatal("Invalid value for --env, must be dev, test or prod", zap.String("val", *envStr))
}
if *p2pNetworkID == DEV_NETWORK_ID && env != common.UnsafeDevNet {
logger.Fatal("May not set --network to dev unless --env is also dev", zap.String("network", *p2pNetworkID), zap.String("env", *envStr))
}
// Verify flags
if *nodeKeyPath == "" {
logger.Fatal("Please specify --nodeKey")
}
if *p2pBootstrap == "" {
logger.Fatal("Please specify --bootstrap")
}
if *permFile == "" {
logger.Fatal("Please specify --permFile")
}
if *ethRPC == "" {
logger.Fatal("Please specify --ethRPC")
}
if *ethContract == "" {
logger.Fatal("Please specify --ethContract")
}
permissions, err := parseConfigFile(*permFile)
if err != nil {
logger.Fatal("Failed to load permissions file", zap.String("permFile", *permFile), zap.Error(err))
}
// Load p2p private key
var priv crypto.PrivKey
priv, err = common.GetOrCreateNodeKey(logger, *nodeKeyPath)
if err != nil {
logger.Fatal("Failed to load node key", zap.Error(err))
}
var signerKey *ecdsa.PrivateKey
if *signerKeyPath != "" {
signerKey, err = common.LoadArmoredKey(*signerKeyPath, CCQ_SERVER_SIGNING_KEY, false)
if err != nil {
logger.Fatal("Failed to loader signer key", zap.Error(err))
}
logger.Info("will sign unsigned requests if api key supports it", zap.Stringer("signingKey", ethCrypto.PubkeyToAddress(signerKey.PublicKey)))
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Run p2p
pendingResponses := NewPendingResponses()
p2p, err := runP2P(ctx, priv, *p2pPort, *p2pNetworkID, *p2pBootstrap, *ethRPC, *ethContract, pendingResponses, logger)
if err != nil {
logger.Fatal("Failed to start p2p", zap.Error(err))
}
// Start the HTTP server
go func() {
s := NewHTTPServer(*listenAddr, p2p.topic_req, permissions, signerKey, pendingResponses, logger, env)
logger.Sugar().Infof("Server listening on %s", *listenAddr)
err := s.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
logger.Fatal("Server closed unexpectedly", zap.Error(err))
}
}()
<-ctx.Done()
logger.Info("Context cancelled, exiting...")
// Cleanly shutdown
// Without this the same host won't properly discover peers until some timeout
p2p.sub.Cancel()
if err := p2p.topic_req.Close(); err != nil {
logger.Error("Error closing the request topic", zap.Error(err))
}
if err := p2p.topic_resp.Close(); err != nil {
logger.Error("Error closing the response topic", zap.Error(err))
}
if err := p2p.host.Close(); err != nil {
logger.Error("Error closing the host", zap.Error(err))
}
}

251
node/cmd/ccq/utils.go Normal file
View File

@ -0,0 +1,251 @@
package ccq
import (
"context"
"crypto/ecdsa"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
"time"
"github.com/certusone/wormhole/node/pkg/common"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/query"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
ethAbi "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors/ethabi"
ethBind "github.com/ethereum/go-ethereum/accounts/abi/bind"
eth_common "github.com/ethereum/go-ethereum/common"
ethCrypto "github.com/ethereum/go-ethereum/crypto"
ethClient "github.com/ethereum/go-ethereum/ethclient"
ethRpc "github.com/ethereum/go-ethereum/rpc"
)
func FetchCurrentGuardianSet(rpcUrl, coreAddr string) (*common.GuardianSet, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
ethContract := eth_common.HexToAddress(coreAddr)
rawClient, err := ethRpc.DialContext(ctx, rpcUrl)
if err != nil {
return nil, fmt.Errorf("failed to connect to ethereum")
}
client := ethClient.NewClient(rawClient)
caller, err := ethAbi.NewAbiCaller(ethContract, client)
if err != nil {
return nil, fmt.Errorf("failed to create caller")
}
currentIndex, err := caller.GetCurrentGuardianSetIndex(&ethBind.CallOpts{Context: ctx})
if err != nil {
return nil, fmt.Errorf("error requesting current guardian set index: %w", err)
}
gs, err := caller.GetGuardianSet(&ethBind.CallOpts{Context: ctx}, currentIndex)
if err != nil {
return nil, fmt.Errorf("error requesting current guardian set value: %w", err)
}
return &common.GuardianSet{
Keys: gs.Keys,
Index: currentIndex,
}, nil
}
type Config struct {
Permissions []User `json:"Permissions"`
}
type User struct {
UserName string `json:"userName"`
ApiKey string `json:"apiKey"`
AllowUnsigned bool `json:"allowUnsigned"`
AllowedCalls []AllowedCall `json:"allowedCalls"`
}
type AllowedCall struct {
EthCall *EthCall `json:"ethCall"`
}
type EthCall struct {
Chain int `json:"chain"`
ContractAddress string `json:"contractAddress"`
Call string `json:"call"`
}
type Permissions map[string]*permissionEntry
type permissionEntry struct {
userName string
apiKey string
allowUnsigned bool
allowedCalls allowedCallsForUser // Key is something like "ethCall:2:000000000000000000000000b4fbf271143f4fbf7b91a5ded31805e42b2208d6:06fdde03"
}
type allowedCallsForUser map[string]struct{}
const ETH_CALL_SIG_LENGTH = 4
// parseConfigFile parses the permissions config file into a map keyed by API key.
func parseConfigFile(fileName string) (Permissions, error) {
jsonFile, err := os.Open(fileName)
if err != nil {
return nil, fmt.Errorf(`failed to open permissions file "%s": %w`, fileName, err)
}
defer jsonFile.Close()
byteValue, err := io.ReadAll(jsonFile)
if err != nil {
return nil, fmt.Errorf(`failed to read permissions file "%s": %w`, fileName, err)
}
retVal, err := parseConfig(byteValue)
if err != nil {
return retVal, fmt.Errorf(`failed to parse permissions file "%s": %w`, fileName, err)
}
return retVal, err
}
// parseConfig parses the permissions config from a buffer into a map keyed by API key.
func parseConfig(byteValue []byte) (Permissions, error) {
var config Config
if err := json.Unmarshal(byteValue, &config); err != nil {
return nil, fmt.Errorf(`failed to unmarshal json: %w`, err)
}
ret := make(Permissions)
userNames := map[string]struct{}{}
for _, user := range config.Permissions {
// Since we log user names in all our error messages, make sure they are unique.
if _, exists := userNames[user.UserName]; exists {
return nil, fmt.Errorf(`UserName "%s" is a duplicate`, user.UserName)
}
userNames[user.UserName] = struct{}{}
apiKey := strings.ToLower(user.ApiKey)
if _, exists := ret[apiKey]; exists {
return nil, fmt.Errorf(`API key "%s" is a duplicate`, apiKey)
}
// Build the list of allowed calls for this API key.
allowedCalls := make(allowedCallsForUser)
for _, ac := range user.AllowedCalls {
var callKey string
if ac.EthCall == nil {
return nil, fmt.Errorf(`unsupported call type for user "%s", must be "ethCall"`, user.UserName)
}
// Convert the contract address into a standard format like "000000000000000000000000b4fbf271143f4fbf7b91a5ded31805e42b2208d6".
contractAddress, err := vaa.StringToAddress(ac.EthCall.ContractAddress)
if err != nil {
return nil, fmt.Errorf(`invalid contract address "%s" for user "%s"`, ac.EthCall.ContractAddress, user.UserName)
}
// The call should be the ABI four byte hex hash of the function signature. Parse it into a standard form of "06fdde03".
call, err := hex.DecodeString(strings.TrimPrefix(ac.EthCall.Call, "0x"))
if err != nil {
return nil, fmt.Errorf(`invalid eth call "%s" for user "%s"`, ac.EthCall.Call, user.UserName)
}
if len(call) != ETH_CALL_SIG_LENGTH {
return nil, fmt.Errorf(`eth call "%s" for user "%s" has an invalid length, must be %d bytes`, ac.EthCall.Call, user.UserName, ETH_CALL_SIG_LENGTH)
}
// The permission key is the chain, contract address and call formatted as a colon separated string.
callKey = fmt.Sprintf("ethCall:%d:%s:%s", ac.EthCall.Chain, contractAddress, hex.EncodeToString(call))
if _, exists := allowedCalls[callKey]; exists {
return nil, fmt.Errorf(`"%s" is a duplicate allowed call for user "%s"`, callKey, user.UserName)
}
allowedCalls[callKey] = struct{}{}
}
pe := &permissionEntry{
userName: user.UserName,
apiKey: apiKey,
allowUnsigned: user.AllowUnsigned,
allowedCalls: allowedCalls,
}
ret[apiKey] = pe
}
return ret, nil
}
// validateRequest verifies that this API key is allowed to do all of the calls in this request. In the case of an error, it returns the HTTP status.
func validateRequest(logger *zap.Logger, env common.Environment, perms Permissions, signerKey *ecdsa.PrivateKey, apiKey string, qr *gossipv1.SignedQueryRequest) (int, error) {
apiKey = strings.ToLower(apiKey)
permsForUser, exists := perms[strings.ToLower(apiKey)]
if !exists {
logger.Debug("invalid api key", zap.String("apiKey", apiKey))
return http.StatusForbidden, fmt.Errorf("invalid api key")
}
// TODO: Should we verify the signatures?
if len(qr.Signature) == 0 {
if !permsForUser.allowUnsigned || signerKey == nil {
logger.Debug("request not signed and unsigned requests not supported for this user",
zap.String("userName", permsForUser.userName),
zap.Bool("allowUnsigned", permsForUser.allowUnsigned),
zap.Bool("signerKeyConfigured", signerKey != nil),
)
return http.StatusBadRequest, fmt.Errorf("request not signed")
}
// Sign the request using our key.
var err error
digest := query.QueryRequestDigest(env, qr.QueryRequest)
qr.Signature, err = ethCrypto.Sign(digest.Bytes(), signerKey)
if err != nil {
logger.Debug("failed to sign request", zap.String("userName", permsForUser.userName), zap.Error(err))
return http.StatusInternalServerError, fmt.Errorf("failed to sign request: %w", err)
}
}
var queryRequest query.QueryRequest
err := queryRequest.Unmarshal(qr.QueryRequest)
if err != nil {
logger.Debug("failed to unmarshal request", zap.String("userName", permsForUser.userName), zap.Error(err))
return http.StatusInternalServerError, fmt.Errorf("failed to unmarshal request: %w", err)
}
// Make sure the overall query request is sane.
if err := queryRequest.Validate(); err != nil {
logger.Debug("failed to validate request", zap.String("userName", permsForUser.userName), zap.Error(err))
return http.StatusBadRequest, fmt.Errorf("failed to validate request: %w", err)
}
// Make sure they are allowed to make all of the calls that they are asking for.
for _, pcq := range queryRequest.PerChainQueries {
switch q := pcq.Query.(type) {
case *query.EthCallQueryRequest:
for _, callData := range q.CallData {
contractAddress, err := vaa.BytesToAddress(callData.To)
if err != nil {
logger.Debug("failed to parse contract address", zap.String("userName", permsForUser.userName), zap.String("contract", hex.EncodeToString(callData.To)), zap.Error(err))
return http.StatusBadRequest, fmt.Errorf("failed to parse contract address: %w", err)
}
if len(callData.Data) < ETH_CALL_SIG_LENGTH {
logger.Debug("eth call data must be at least four bytes", zap.String("userName", permsForUser.userName), zap.String("data", hex.EncodeToString(callData.Data)))
return http.StatusBadRequest, fmt.Errorf("eth call data must be at least four bytes")
}
call := hex.EncodeToString(callData.Data[0:ETH_CALL_SIG_LENGTH])
callKey := fmt.Sprintf("ethCall:%d:%s:%s", pcq.ChainId, contractAddress, call)
if _, exists := permsForUser.allowedCalls[callKey]; !exists {
logger.Debug("requested call not authorized", zap.String("userName", permsForUser.userName), zap.String("callKey", callKey))
return http.StatusBadRequest, fmt.Errorf(`call "%s" not authorized`, callKey)
}
}
default:
logger.Debug("unsupported query type", zap.String("userName", permsForUser.userName), zap.Any("type", pcq.Query))
return http.StatusBadRequest, fmt.Errorf("unsupported query type")
}
}
logger.Debug("submitting query request", zap.String("userName", permsForUser.userName))
return http.StatusOK, nil
}

View File

@ -4,6 +4,7 @@ import (
"fmt"
"os"
"github.com/certusone/wormhole/node/cmd/ccq"
"github.com/certusone/wormhole/node/cmd/debug"
"github.com/certusone/wormhole/node/cmd/spy"
"github.com/certusone/wormhole/node/pkg/version"
@ -48,6 +49,7 @@ func init() {
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is $HOME/.guardiand.yaml)")
rootCmd.AddCommand(guardiand.NodeCmd)
rootCmd.AddCommand(spy.SpyCmd)
rootCmd.AddCommand(ccq.QueryServerCmd)
rootCmd.AddCommand(guardiand.KeygenCmd)
rootCmd.AddCommand(guardiand.AdminCmd)
rootCmd.AddCommand(guardiand.TemplateCmd)