node: add request logging with --publicRpcLogDetail and --logPublicRpcToTelemetry (#2390)

Release Notes
New guardiand cli options:
--publicRpcLogDetail [string]
none -- no logging of gRPC requests at all
minimal -- only log gRPC methods
full (default) -- additionally log user-agent and gRPC request payload
--logPublicRpcToTelemetry [bool]
false do not send publicrpc logs to Google Cloud Logging
true (default) -- (current behavior)

Co-authored-by: tbjump <>
This commit is contained in:
tbjump 2023-03-01 07:10:04 -08:00 committed by GitHub
parent 3ba494af96
commit 73841556ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 184 additions and 56 deletions

View File

@ -165,6 +165,8 @@ spec:
- /tmp/publicrpc.sock
- --dataDir
- /tmp/data
- --publicRpcLogDetail
- "full"
# - --chainGovernorEnabled=true
# - --logLevel=debug
securityContext:

View File

@ -543,7 +543,7 @@ func adminServiceRunnable(
publicrpcService := publicrpc.NewPublicrpcServer(logger, db, gst, gov)
grpcServer := common.NewInstrumentedGRPCServer(logger)
grpcServer := common.NewInstrumentedGRPCServer(logger, common.GrpcLogDetailMinimal)
nodev1.RegisterNodePrivilegedServiceServer(grpcServer, nodeService)
publicrpcv1.RegisterPublicRPCServiceServer(grpcServer, publicrpcService)
return supervisor.GRPCServer(grpcServer, l, false), nil

View File

@ -178,7 +178,9 @@ var (
baseRPC *string
baseContract *string
logLevel *string
logLevel *string
publicRpcLogDetailStr *string
publicRpcLogToTelemetry *bool
unsafeDevMode *bool
testnetMode *bool
@ -329,6 +331,8 @@ func init() {
baseContract = NodeCmd.Flags().String("baseContract", "", "Base contract address")
logLevel = NodeCmd.Flags().String("logLevel", "info", "Logging level (debug, info, warn, error, dpanic, panic, fatal)")
publicRpcLogDetailStr = NodeCmd.Flags().String("publicRpcLogDetail", "full", "The detail with which public RPC requests shall be logged (none=no logging, minimal=only log gRPC methods, full=log gRPC method, payload (up to 200 bytes) and user agent (up to 200 bytes))")
publicRpcLogToTelemetry = NodeCmd.Flags().Bool("logPublicRpcToTelemetry", true, "whether or not to include publicRpc request logs in telemetry")
unsafeDevMode = NodeCmd.Flags().Bool("unsafeDevMode", false, "Launch node in unsafe, deterministic devnet mode")
testnetMode = NodeCmd.Flags().Bool("testnetMode", false, "Launch node in testnet mode (enables testnet-only features)")
@ -666,6 +670,19 @@ func runNode(cmd *cobra.Command, args []string) {
logger.Fatal("Please do not specify --baseContract")
}
}
var publicRpcLogDetail common.GrpcLogDetail
switch *publicRpcLogDetailStr {
case "none":
publicRpcLogDetail = common.GrpcLogDetailNone
case "minimal":
publicRpcLogDetail = common.GrpcLogDetailMinimal
case "full":
publicRpcLogDetail = common.GrpcLogDetailFull
default:
logger.Fatal("--publicRpcLogDetail should be one of (none, minimal, full)")
}
if *nodeName == "" {
logger.Fatal("Please specify --nodeName")
}
@ -947,7 +964,7 @@ func runNode(cmd *cobra.Command, args []string) {
logger.Fatal("Failed to get peer ID from private key", zap.Error(err))
}
tm, err := telemetry.New(context.Background(), telemetryProject, creds, map[string]string{
tm, err := telemetry.New(context.Background(), telemetryProject, creds, *publicRpcLogToTelemetry, map[string]string{
"node_name": *nodeName,
"node_key": peerID.Pretty(),
"guardian_addr": guardianAddr,
@ -1442,7 +1459,7 @@ func runNode(cmd *cobra.Command, args []string) {
if shouldStart(publicGRPCSocketPath) {
// local public grpc service socket
publicrpcUnixService, publicrpcServer, err := publicrpcUnixServiceRunnable(logger, *publicGRPCSocketPath, db, gst, gov)
publicrpcUnixService, publicrpcServer, err := publicrpcUnixServiceRunnable(logger, *publicGRPCSocketPath, publicRpcLogDetail, db, gst, gov)
if err != nil {
logger.Fatal("failed to create publicrpc service socket", zap.Error(err))
}
@ -1452,7 +1469,7 @@ func runNode(cmd *cobra.Command, args []string) {
}
if shouldStart(publicRPC) {
publicrpcService, err := publicrpcTcpServiceRunnable(logger, *publicRPC, db, gst, gov)
publicrpcService, err := publicrpcTcpServiceRunnable(logger, *publicRPC, publicRpcLogDetail, db, gst, gov)
if err != nil {
log.Fatal("failed to create publicrpc tcp service", zap.Error(err))
}

View File

@ -15,7 +15,7 @@ import (
"google.golang.org/grpc"
)
func publicrpcTcpServiceRunnable(logger *zap.Logger, listenAddr string, db *db.Database, gst *common.GuardianSetState, gov *governor.ChainGovernor) (supervisor.Runnable, error) {
func publicrpcTcpServiceRunnable(logger *zap.Logger, listenAddr string, publicRpcLogDetail common.GrpcLogDetail, db *db.Database, gst *common.GuardianSetState, gov *governor.ChainGovernor) (supervisor.Runnable, error) {
l, err := net.Listen("tcp", listenAddr)
if err != nil {
@ -25,13 +25,14 @@ func publicrpcTcpServiceRunnable(logger *zap.Logger, listenAddr string, db *db.D
logger.Info("publicrpc server listening", zap.String("addr", l.Addr().String()))
rpcServer := publicrpc.NewPublicrpcServer(logger, db, gst, gov)
grpcServer := common.NewInstrumentedGRPCServer(logger)
grpcServer := common.NewInstrumentedGRPCServer(logger, publicRpcLogDetail)
publicrpcv1.RegisterPublicRPCServiceServer(grpcServer, rpcServer)
return supervisor.GRPCServer(grpcServer, l, false), nil
}
func publicrpcUnixServiceRunnable(logger *zap.Logger, socketPath string, db *db.Database, gst *common.GuardianSetState, gov *governor.ChainGovernor) (supervisor.Runnable, *grpc.Server, error) {
func publicrpcUnixServiceRunnable(logger *zap.Logger, socketPath string, publicRpcLogDetail common.GrpcLogDetail, db *db.Database, gst *common.GuardianSetState, gov *governor.ChainGovernor) (supervisor.Runnable, *grpc.Server, error) {
// Delete existing UNIX socket, if present.
fi, err := os.Stat(socketPath)
if err == nil {
@ -65,7 +66,7 @@ func publicrpcUnixServiceRunnable(logger *zap.Logger, socketPath string, db *db.
publicrpcService := publicrpc.NewPublicrpcServer(logger, db, gst, gov)
grpcServer := common.NewInstrumentedGRPCServer(logger)
grpcServer := common.NewInstrumentedGRPCServer(logger, publicRpcLogDetail)
publicrpcv1.RegisterPublicRPCServiceServer(grpcServer, publicrpcService)
return supervisor.GRPCServer(grpcServer, l, false), grpcServer, nil
}

View File

@ -75,13 +75,6 @@ func publicwebServiceRunnable(
mux := http.NewServeMux()
grpcWebServer := grpcweb.WrapServer(grpcServer)
mux.Handle("/", allowCORSWrapper(http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
logger.Info("public rpc request",
zap.String("method", req.Method),
zap.String("path", req.URL.Path),
zap.String("remote_addr", req.RemoteAddr),
zap.Any("headers", req.Header))
if grpcWebServer.IsGrpcWebRequest(req) {
grpcWebServer.ServeHTTP(resp, req)
} else {

View File

@ -413,7 +413,7 @@ func spyServerRunnable(s *spyServer, logger *zap.Logger, listenAddr string) (sup
logger.Info("spy server listening", zap.String("addr", l.Addr().String()))
grpcServer := common.NewInstrumentedGRPCServer(logger)
grpcServer := common.NewInstrumentedGRPCServer(logger, common.GrpcLogDetailFull)
spyv1.RegisterSpyRPCServiceServer(grpcServer, s)
return supervisor.GRPCServer(grpcServer, l, false), grpcServer, nil

View File

@ -227,7 +227,7 @@ func init() {
lis = bufconn.Listen(bufSize)
grpcServer := common.NewInstrumentedGRPCServer(logger)
grpcServer := common.NewInstrumentedGRPCServer(logger, common.GrpcLogDetailFull)
mockedSpyServer = newSpyServer(logger)
spyv1.RegisterSpyRPCServiceServer(grpcServer, mockedSpyServer)

View File

@ -2,56 +2,121 @@ package common
import (
"context"
"fmt"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/reflect/protoreflect"
)
// xForwardedForStreamServerInterceptor adds the `x-forwarded-for` metadata as a tag to cause it to be logged by grpc_zap.StreamServerInterceptor().
// Note that `x-forwarded-for` can only be trusted if the latest hop in the proxy chain is trusted.
// For JSON-Web requests, the latest hop is the guardian itself (`grpc-gateway`), which is listening on TCP and forwarding to the gRPC publicrpc UNIX socket.
// This can be identified by `"peer.address": "@"` in the logs and `grpc-gateway` correctly sets the `x-forwarded-for` metadata.
func xForwardedForStreamServerInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
ctx := stream.Context()
type GrpcLogDetail string
if md, ok := metadata.FromIncomingContext(ctx); ok {
grpc_ctxtags.Extract(ctx).Set("x-forwarded-for", md.Get("x-forwarded-for"))
const (
GrpcLogDetailNone GrpcLogDetail = "none"
GrpcLogDetailMinimal GrpcLogDetail = "minimal"
GrpcLogDetailFull GrpcLogDetail = "full"
)
func truncateStr(str string, maxLen int) string {
if len(str) > maxLen {
return str[:maxLen] + "..."
}
err := handler(srv, stream)
return err
return str
}
// xForwardedForServerInterceptor adds the `x-forwarded-for` metadata as a tag to cause it to be logged by grpc_zap.UnaryServerInterceptor().
func addDetail(ctx context.Context, logDetail GrpcLogDetail) {
if logDetail == GrpcLogDetailNone {
return
}
if md, ok := metadata.FromIncomingContext(ctx); ok {
tags := grpc_ctxtags.Extract(ctx)
tags.Set("x-forwarded-for", md.Get("x-forwarded-for"))
if logDetail == GrpcLogDetailFull {
if len(md.Get("grpcgateway-user-agent")) > 0 {
tags.Set("user-agent", truncateStr(md.Get("grpcgateway-user-agent")[0], 200))
}
}
}
}
// newMetadataStreamServerInterceptor returns stream interceptor that
// adds the `x-forwarded-for` and, if logDetail == "full", the `user-agent` metadata as a tag to cause it to be logged by grpc_zap.StreamServerInterceptor().
// Note that `x-forwarded-for` can only be trusted if the latest hop in the proxy chain is trusted.
// For JSON-Web requests, the latest hop is the guardian itself (`grpc-gateway`), which is listening on TCP and forwarding to the gRPC publicrpc UNIX socket.
// This can be identified by `"peer.address": "@"` in the logs and `grpc-gateway` correctly sets the `x-forwarded-for` metadata.
func xForwardedForServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if md, ok := metadata.FromIncomingContext(ctx); ok {
grpc_ctxtags.Extract(ctx).Set("x-forwarded-for", md.Get("x-forwarded-for"))
func newMetadataStreamServerInterceptor(logDetail GrpcLogDetail) func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
ctx := stream.Context()
addDetail(ctx, logDetail)
return handler(srv, stream)
}
}
// newMetadataServerInterceptor returns a unary interceptor that
// adds the `x-forwarded-for` and, if logDetail == "full", the `user-agent` metadata as a tag to cause it to be logged by grpc_zap.StreamServerInterceptor().
// Note that `x-forwarded-for` can only be trusted if the latest hop in the proxy chain is trusted.
// For JSON-Web requests, the latest hop is the guardian itself (`grpc-gateway`), which is listening on TCP and forwarding to the gRPC publicrpc UNIX socket.
// This can be identified by `"peer.address": "@"` in the logs and `grpc-gateway` correctly sets the `x-forwarded-for` metadata.
func newMetadataServerInterceptor(logDetail GrpcLogDetail) func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
addDetail(ctx, logDetail)
return handler(ctx, req)
}
}
func requestPayloadServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if p, ok := req.(protoreflect.ProtoMessage); ok {
if b, err := protojson.Marshal(p); err == nil {
if len(b) <= 200 {
grpc_ctxtags.Extract(ctx).Set("grpc.requestbody", &protojsonObjectMarshaler{pb: p})
} else {
grpc_ctxtags.Extract(ctx).Set("grpc.requestbody", "too long")
}
}
}
return handler(ctx, req)
}
func NewInstrumentedGRPCServer(logger *zap.Logger) *grpc.Server {
server := grpc.NewServer(
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
grpc_ctxtags.StreamServerInterceptor(),
xForwardedForStreamServerInterceptor,
grpc_prometheus.StreamServerInterceptor,
func NewInstrumentedGRPCServer(logger *zap.Logger, rpcLogDetail GrpcLogDetail) *grpc.Server {
streamInterceptors := []grpc.StreamServerInterceptor{
grpc_ctxtags.StreamServerInterceptor(),
grpc_prometheus.StreamServerInterceptor,
}
unaryInterceptors := []grpc.UnaryServerInterceptor{
grpc_ctxtags.UnaryServerInterceptor(),
grpc_prometheus.UnaryServerInterceptor,
}
if rpcLogDetail != GrpcLogDetailNone {
logger = logger.With(zap.Bool("_privateLogEntry", true))
streamInterceptors = append(streamInterceptors,
newMetadataStreamServerInterceptor(rpcLogDetail),
grpc_zap.StreamServerInterceptor(logger),
)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
grpc_ctxtags.UnaryServerInterceptor(),
xForwardedForServerInterceptor,
grpc_prometheus.UnaryServerInterceptor,
)
// if logging detail is "full", also log the request payload (only applicable to unary)
if rpcLogDetail == GrpcLogDetailFull {
unaryInterceptors = append(unaryInterceptors, requestPayloadServerInterceptor)
}
unaryInterceptors = append(unaryInterceptors,
newMetadataServerInterceptor(rpcLogDetail),
grpc_zap.UnaryServerInterceptor(logger),
)),
)
}
server := grpc.NewServer(
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)),
)
grpc_prometheus.EnableHandlingTimeHistogram()
@ -59,3 +124,22 @@ func NewInstrumentedGRPCServer(logger *zap.Logger) *grpc.Server {
return server
}
// this helper type and associated functions are such that the ZAP jsonEncoder will properly encode the gRPC request payload.
// We could instead just encode the payload to a string here, but then that string will be encoded again by the ZAP jsonEncoder, making downstream processing more difficult
type protojsonObjectMarshaler struct {
pb protoreflect.ProtoMessage
}
func (j *protojsonObjectMarshaler) MarshalLogObject(e zapcore.ObjectEncoder) error {
// ZAP jsonEncoder deals with AddReflect by using json.MarshalObject. The same thing applies for consoleEncoder.
return e.AddReflected("msg", j)
}
func (j *protojsonObjectMarshaler) MarshalJSON() ([]byte, error) {
b, err := protojson.Marshal(j.pb)
if err != nil {
return nil, fmt.Errorf("jsonpb serializer failed: %v", err)
}
return b, nil
}

View File

@ -15,14 +15,16 @@ import (
)
type Telemetry struct {
encoder *encoder
encoder *guardianTelemetryEncoder
serviceAccountJSON []byte
}
type encoder struct {
zapcore.Encoder
logger *logging.Logger
labels map[string]string
// guardianTelemetryEncoder is a wrapper around zapcore.jsonEncoder that logs to google cloud logging
type guardianTelemetryEncoder struct {
zapcore.Encoder // zapcore.jsonEncoder
logger *logging.Logger // Google Cloud logger
labels map[string]string // labels to add to each cloud log
skipPrivateLogs bool
}
// Mirrors the conversion done by zapdriver. We need to convert this
@ -38,7 +40,24 @@ var logLevelSeverity = map[zapcore.Level]logging.Severity{
zapcore.FatalLevel: logging.Emergency,
}
func (enc *encoder) EncodeEntry(entry zapcore.Entry, fields []zapcore.Field) (*buffer.Buffer, error) {
func (enc *guardianTelemetryEncoder) EncodeEntry(entry zapcore.Entry, fields []zapcore.Field) (*buffer.Buffer, error) {
// if skipPrivateLogs==true, then private logs don't go to telemetry
if enc.skipPrivateLogs {
for _, f := range fields {
if f.Type == zapcore.BoolType {
if f.Key == "_privateLogEntry" {
if f.Integer == 1 {
// do not forward to telemetry by short-circuiting to the underlying encoder.
return enc.Encoder.EncodeEntry(entry, fields)
} else {
break
}
}
}
}
}
buf, err := enc.Encoder.EncodeEntry(entry, fields)
if err != nil {
return nil, err
@ -62,7 +81,18 @@ func (enc *encoder) EncodeEntry(entry zapcore.Entry, fields []zapcore.Field) (*b
return buf, nil
}
func New(ctx context.Context, project string, serviceAccountJSON []byte, labels map[string]string) (*Telemetry, error) {
// Clone() clones the encoder. This function is not used by the Guardian itself, but it is used by zapcore.
// Without this implementation, a guardianTelemetryEncoder could get silently converted into the underlying zapcore.Encoder at some point, leading to missing telemetry logs.
func (enc *guardianTelemetryEncoder) Clone() zapcore.Encoder {
return &guardianTelemetryEncoder{
Encoder: enc.Encoder.Clone(),
labels: enc.labels,
}
}
// New creates a new Telemetry logger.
// skipPrivateLogs: if set to `true`, logs with the field zap.Bool("_privateLogEntry", true) will not be logged by telemetry.
func New(ctx context.Context, project string, serviceAccountJSON []byte, skipPrivateLogs bool, labels map[string]string) (*Telemetry, error) {
gc, err := logging.NewClient(ctx, project, option.WithCredentialsJSON(serviceAccountJSON))
if err != nil {
return nil, fmt.Errorf("unable to create logging client: %v", err)
@ -74,10 +104,11 @@ func New(ctx context.Context, project string, serviceAccountJSON []byte, labels
return &Telemetry{
serviceAccountJSON: serviceAccountJSON,
encoder: &encoder{
Encoder: zapcore.NewJSONEncoder(zapdriver.NewProductionEncoderConfig()),
logger: gc.Logger("wormhole"),
labels: labels,
encoder: &guardianTelemetryEncoder{
Encoder: zapcore.NewJSONEncoder(zapdriver.NewProductionEncoderConfig()),
logger: gc.Logger("wormhole"),
labels: labels,
skipPrivateLogs: skipPrivateLogs,
},
}, nil
}