2023-10-12 12:29:21 -07:00
// Note: To generate a signer key file do: guardiand keygen --block-type "CCQ SERVER SIGNING KEY" /path/to/key/file
// You will need to add this key to ccqAllowedRequesters in the guardian configs.
package ccq
import (
"context"
"crypto/ecdsa"
"fmt"
"net/http"
"os"
2023-11-09 11:43:48 -08:00
"os/signal"
"syscall"
2023-12-11 14:00:46 -08:00
"time"
2023-10-12 12:29:21 -07:00
"github.com/certusone/wormhole/node/pkg/common"
2024-03-22 12:27:03 -07:00
"github.com/certusone/wormhole/node/pkg/p2p"
2023-10-12 12:29:21 -07:00
"github.com/certusone/wormhole/node/pkg/telemetry"
2023-11-03 11:06:18 -07:00
promremotew "github.com/certusone/wormhole/node/pkg/telemetry/prom_remote_write"
2023-10-12 12:29:21 -07:00
"github.com/certusone/wormhole/node/pkg/version"
ethCrypto "github.com/ethereum/go-ethereum/crypto"
ipfslog "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/spf13/cobra"
"go.uber.org/zap"
)
const CCQ_SERVER_SIGNING_KEY = "CCQ SERVER SIGNING KEY"
var (
2024-05-02 13:14:14 -07:00
envStr * string
p2pNetworkID * string
p2pPort * uint
p2pBootstrap * string
listenAddr * string
nodeKeyPath * string
signerKeyPath * string
permFile * string
ethRPC * string
ethContract * string
logLevel * string
telemetryLokiURL * string
telemetryNodeName * string
statusAddr * string
promRemoteURL * string
shutdownDelay1 * uint
shutdownDelay2 * uint
monitorPeers * bool
gossipAdvertiseAddress * string
2024-06-07 06:13:07 -07:00
allowAnything * bool
2023-10-12 12:29:21 -07:00
)
const DEV_NETWORK_ID = "/wormhole/dev"
func init ( ) {
2024-03-22 12:27:03 -07:00
envStr = QueryServerCmd . Flags ( ) . String ( "env" , "" , "environment (devnet, testnet, mainnet)" )
p2pNetworkID = QueryServerCmd . Flags ( ) . String ( "network" , "" , "P2P network identifier (optional, overrides default for environment)" )
2023-10-12 12:29:21 -07:00
p2pPort = QueryServerCmd . Flags ( ) . Uint ( "port" , 8995 , "P2P UDP listener port" )
2024-03-22 12:27:03 -07:00
p2pBootstrap = QueryServerCmd . Flags ( ) . String ( "bootstrap" , "" , "P2P bootstrap peers (optional for testnet or mainnet, overrides default, required for devnet)" )
2023-10-12 12:29:21 -07:00
nodeKeyPath = QueryServerCmd . Flags ( ) . String ( "nodeKey" , "" , "Path to node key (will be generated if it doesn't exist)" )
signerKeyPath = QueryServerCmd . Flags ( ) . String ( "signerKey" , "" , "Path to key used to sign unsigned queries" )
listenAddr = QueryServerCmd . Flags ( ) . String ( "listenAddr" , "[::]:6069" , "Listen address for query server (disabled if blank)" )
permFile = QueryServerCmd . Flags ( ) . String ( "permFile" , "" , "JSON file containing permissions configuration" )
ethRPC = QueryServerCmd . Flags ( ) . String ( "ethRPC" , "" , "Ethereum RPC for fetching current guardian set" )
ethContract = QueryServerCmd . Flags ( ) . String ( "ethContract" , "" , "Ethereum core bridge address for fetching current guardian set" )
logLevel = QueryServerCmd . Flags ( ) . String ( "logLevel" , "info" , "Logging level (debug, info, warn, error, dpanic, panic, fatal)" )
telemetryLokiURL = QueryServerCmd . Flags ( ) . String ( "telemetryLokiURL" , "" , "Loki cloud logging URL" )
telemetryNodeName = QueryServerCmd . Flags ( ) . String ( "telemetryNodeName" , "" , "Node name used in telemetry" )
2023-11-08 09:22:34 -08:00
statusAddr = QueryServerCmd . Flags ( ) . String ( "statusAddr" , "[::]:6060" , "Listen address for status server (disabled if blank)" )
2023-11-03 11:06:18 -07:00
promRemoteURL = QueryServerCmd . Flags ( ) . String ( "promRemoteURL" , "" , "Prometheus remote write URL (Grafana)" )
2023-12-21 12:28:15 -08:00
monitorPeers = QueryServerCmd . Flags ( ) . Bool ( "monitorPeers" , false , "Should monitor bootstrap peers and attempt to reconnect" )
2024-05-02 13:14:14 -07:00
gossipAdvertiseAddress = QueryServerCmd . Flags ( ) . String ( "gossipAdvertiseAddress" , "" , "External IP to advertize on P2P (use if behind a NAT or running in k8s)" )
2024-06-07 06:13:07 -07:00
allowAnything = QueryServerCmd . Flags ( ) . Bool ( "allowAnything" , false , ` Should allow API keys with the "allowAnything" flag (only allowed in testnet and devnet) ` )
2023-12-11 14:00:46 -08:00
// The default health check monitoring is every five seconds, with a five second timeout, and you have to miss two, for 20 seconds total.
shutdownDelay1 = QueryServerCmd . Flags ( ) . Uint ( "shutdownDelay1" , 25 , "Seconds to delay after disabling health check on shutdown" )
// The guardians will wait up to 60 seconds before giving up on a request.
shutdownDelay2 = QueryServerCmd . Flags ( ) . Uint ( "shutdownDelay2" , 65 , "Seconds to wait after delay1 for pending requests to complete" )
2023-10-12 12:29:21 -07:00
}
var QueryServerCmd = & cobra . Command {
Use : "query-server" ,
Short : "Run the cross-chain query server" ,
Run : runQueryServer ,
}
func runQueryServer ( cmd * cobra . Command , args [ ] string ) {
common . SetRestrictiveUmask ( )
// Setup logging
lvl , err := ipfslog . LevelFromString ( * logLevel )
if err != nil {
fmt . Println ( "Invalid log level" )
os . Exit ( 1 )
}
logger := ipfslog . Logger ( "query-server" ) . Desugar ( )
ipfslog . SetAllLoggers ( lvl )
2024-03-22 12:27:03 -07:00
env , err := common . ParseEnvironment ( * envStr )
if err != nil || ( env != common . UnsafeDevNet && env != common . TestNet && env != common . MainNet ) {
if * envStr == "" {
logger . Fatal ( "Please specify --env" )
}
logger . Fatal ( "Invalid value for --env, should be devnet, testnet or mainnet" , zap . String ( "val" , * envStr ) )
}
if * p2pNetworkID == "" {
* p2pNetworkID = p2p . GetNetworkId ( env )
} else if env != common . UnsafeDevNet {
logger . Warn ( "overriding default p2p network ID" , zap . String ( "p2pNetworkID" , * p2pNetworkID ) )
}
if * p2pNetworkID == DEV_NETWORK_ID && env != common . UnsafeDevNet {
logger . Fatal ( "May not set --network to dev unless --env is also dev" , zap . String ( "network" , * p2pNetworkID ) , zap . String ( "env" , * envStr ) )
}
networkID := * p2pNetworkID + "/ccq"
if * p2pBootstrap == "" {
* p2pBootstrap , err = p2p . GetCcqBootstrapPeers ( env )
if err != nil {
logger . Fatal ( "failed to determine the bootstrap peers from the environment" , zap . String ( "env" , string ( env ) ) , zap . Error ( err ) )
}
} else if env != common . UnsafeDevNet {
logger . Warn ( "overriding default p2p bootstrap peers" , zap . String ( "p2pBootstrap" , * p2pBootstrap ) )
}
2023-10-12 12:29:21 -07:00
if * telemetryLokiURL != "" {
logger . Info ( "Using Loki telemetry logger" )
if * telemetryNodeName == "" {
logger . Fatal ( "if --telemetryLokiURL is specified --telemetryNodeName must be specified" )
}
labels := map [ string ] string {
2023-11-15 08:39:00 -08:00
"network" : * p2pNetworkID ,
2023-10-12 12:29:21 -07:00
"node_name" : * telemetryNodeName ,
"version" : version . Version ( ) ,
}
tm , err := telemetry . NewLokiCloudLogger ( context . Background ( ) , logger , * telemetryLokiURL , "ccq_server" , true , labels )
if err != nil {
logger . Fatal ( "Failed to initialize telemetry" , zap . Error ( err ) )
}
defer tm . Close ( )
logger = tm . WrapLogger ( logger ) // Wrap logger with telemetry logger
}
// Verify flags
if * nodeKeyPath == "" {
logger . Fatal ( "Please specify --nodeKey" )
}
if * p2pBootstrap == "" {
logger . Fatal ( "Please specify --bootstrap" )
}
if * permFile == "" {
logger . Fatal ( "Please specify --permFile" )
}
if * ethRPC == "" {
logger . Fatal ( "Please specify --ethRPC" )
}
if * ethContract == "" {
logger . Fatal ( "Please specify --ethContract" )
}
2024-06-07 06:13:07 -07:00
if * allowAnything {
if env != common . TestNet && env != common . UnsafeDevNet {
logger . Fatal ( ` The "--allowAnything" flag is only supported in testnet and devnet ` )
}
logger . Info ( "will allow anything for users for which it is enabled" )
}
permissions , err := NewPermissions ( * permFile , * allowAnything )
2023-10-12 12:29:21 -07:00
if err != nil {
logger . Fatal ( "Failed to load permissions file" , zap . String ( "permFile" , * permFile ) , zap . Error ( err ) )
}
2024-01-29 12:00:54 -08:00
loggingMap := NewLoggingMap ( )
2023-10-12 12:29:21 -07:00
// Load p2p private key
var priv crypto . PrivKey
priv , err = common . GetOrCreateNodeKey ( logger , * nodeKeyPath )
if err != nil {
logger . Fatal ( "Failed to load node key" , zap . Error ( err ) )
}
var signerKey * ecdsa . PrivateKey
if * signerKeyPath != "" {
signerKey , err = common . LoadArmoredKey ( * signerKeyPath , CCQ_SERVER_SIGNING_KEY , false )
if err != nil {
logger . Fatal ( "Failed to loader signer key" , zap . Error ( err ) )
}
logger . Info ( "will sign unsigned requests if api key supports it" , zap . Stringer ( "signingKey" , ethCrypto . PubkeyToAddress ( signerKey . PublicKey ) ) )
}
ctx , cancel := context . WithCancel ( context . Background ( ) )
defer cancel ( )
// Run p2p
2024-03-14 07:39:07 -07:00
pendingResponses := NewPendingResponses ( logger )
2024-05-02 13:14:14 -07:00
p2p , err := runP2P ( ctx , priv , * p2pPort , networkID , * p2pBootstrap , * ethRPC , * ethContract , pendingResponses , logger , * monitorPeers , loggingMap , * gossipAdvertiseAddress )
2023-10-12 12:29:21 -07:00
if err != nil {
logger . Fatal ( "Failed to start p2p" , zap . Error ( err ) )
}
// Start the HTTP server
go func ( ) {
2024-01-29 12:00:54 -08:00
s := NewHTTPServer ( * listenAddr , p2p . topic_req , permissions , signerKey , pendingResponses , logger , env , loggingMap )
2023-10-12 12:29:21 -07:00
logger . Sugar ( ) . Infof ( "Server listening on %s" , * listenAddr )
err := s . ListenAndServe ( )
if err != nil && err != http . ErrServerClosed {
logger . Fatal ( "Server closed unexpectedly" , zap . Error ( err ) )
}
} ( )
2023-11-08 09:22:34 -08:00
// Start the status server
2023-12-11 14:00:46 -08:00
var statServer * statusServer
2023-11-08 09:22:34 -08:00
if * statusAddr != "" {
2023-12-11 14:00:46 -08:00
statServer = NewStatusServer ( * statusAddr , logger , env )
2023-11-08 09:22:34 -08:00
go func ( ) {
logger . Sugar ( ) . Infof ( "Status server listening on %s" , * statusAddr )
2023-12-11 14:00:46 -08:00
err := statServer . httpServer . ListenAndServe ( )
2023-11-08 09:22:34 -08:00
if err != nil && err != http . ErrServerClosed {
logger . Fatal ( "Status server closed unexpectedly" , zap . Error ( err ) )
}
} ( )
}
2023-11-03 11:06:18 -07:00
// Start the Prometheus scraper
usingPromRemoteWrite := * promRemoteURL != ""
if usingPromRemoteWrite {
var info promremotew . PromTelemetryInfo
info . PromRemoteURL = * promRemoteURL
info . Labels = map [ string ] string {
"node_name" : * telemetryNodeName ,
"network" : * p2pNetworkID ,
"version" : version . Version ( ) ,
"product" : "ccq_server" ,
}
err := RunPrometheusScraper ( ctx , logger , info )
if err != nil {
logger . Fatal ( "Failed to start prometheus scraper" , zap . Error ( err ) )
}
}
2023-11-09 11:43:48 -08:00
// Handle SIGTERM
sigterm := make ( chan os . Signal , 1 )
signal . Notify ( sigterm , syscall . SIGTERM )
go func ( ) {
<- sigterm
2023-12-11 14:00:46 -08:00
if statServer != nil && * shutdownDelay1 != 0 {
logger . Info ( "Received sigterm. disabling health checks and pausing." )
statServer . disableHealth ( )
time . Sleep ( time . Duration ( * shutdownDelay1 ) * time . Second )
numPending := 0
logger . Info ( "Waiting for any outstanding requests to complete before shutting down." )
for count := 0 ; count < int ( * shutdownDelay2 ) ; count ++ {
time . Sleep ( time . Second )
numPending = pendingResponses . NumPending ( )
if numPending == 0 {
break
}
}
if numPending == 0 {
logger . Info ( "Done waiting. shutting down." )
} else {
logger . Error ( "Gave up waiting for pending requests to finish. shutting down anyway." , zap . Int ( "numStillPending" , numPending ) )
}
} else {
logger . Info ( "Received sigterm. exiting." )
}
2023-11-09 11:43:48 -08:00
cancel ( )
} ( )
2023-12-11 12:44:48 -08:00
// Start watching for permissions file updates.
errC := make ( chan error )
permissions . StartWatcher ( ctx , logger , errC )
2023-10-12 12:29:21 -07:00
2024-01-29 12:00:54 -08:00
// Star logging cleanup process.
loggingMap . Start ( ctx , logger , errC )
2023-12-11 12:44:48 -08:00
// Wait for either a shutdown or a fatal error from the permissions watcher.
select {
case <- ctx . Done ( ) :
logger . Info ( "Context cancelled, exiting..." )
break
case err := <- errC :
logger . Error ( "Encountered an error, exiting" , zap . Error ( err ) )
break
}
// Stop the permissions file watcher.
permissions . StopWatcher ( )
// Shutdown p2p. Without this the same host won't properly discover peers until some timeout
2023-10-12 12:29:21 -07:00
p2p . sub . Cancel ( )
if err := p2p . topic_req . Close ( ) ; err != nil {
logger . Error ( "Error closing the request topic" , zap . Error ( err ) )
}
if err := p2p . topic_resp . Close ( ) ; err != nil {
logger . Error ( "Error closing the response topic" , zap . Error ( err ) )
}
if err := p2p . host . Close ( ) ; err != nil {
logger . Error ( "Error closing the host" , zap . Error ( err ) )
}
}