node/telemetry: fix nullpointer, _privateLogEntry, tests (#2482)

1. Fix a nullpointer dereference bug caused by not cloning everything in (enc *guardianTelemetryEncoder) Clone()
2. Fix a bug that caused logs marked with _privateLogEntry to still be sent to telemetry because when a child logger is created (logger.With(zap.Bool("_privateLogEntry", true))) the extra fields will go into the encoder's buffer and will not be passed to EncodeEntry(entry zapcore.Entry, fields []zapcore.Field). This bug had no production impact because the telemetry logger was broken (see (1))
3. make the telemetry module testable and add unit tests

Co-authored-by: tbjump <>
This commit is contained in:
tbjump 2023-03-08 11:58:51 -08:00 committed by GitHub
parent 9b3458a909
commit 6def9b306c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 150 additions and 39 deletions

View File

@ -1,10 +1,12 @@
package telemetry
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"time"
"cloud.google.com/go/logging"
"github.com/blendle/zapdriver"
@ -14,16 +16,39 @@ import (
"google.golang.org/api/option"
)
const telemetryLogLevel = zap.InfoLevel
type Telemetry struct {
encoder *guardianTelemetryEncoder
serviceAccountJSON []byte
encoder *guardianTelemetryEncoder
}
type ExternalLogger interface {
log(time time.Time, message []byte, level zapcore.Level)
flush() error
}
type ExternalLoggerGoogleCloud struct {
*logging.Logger
labels map[string]string // labels to add to each cloud log
}
func (logger *ExternalLoggerGoogleCloud) log(time time.Time, message []byte, level zapcore.Level) {
logger.Log(logging.Entry{
Timestamp: time,
Payload: message,
Severity: logLevelSeverity[level],
Labels: logger.labels,
})
}
func (logger *ExternalLoggerGoogleCloud) flush() error {
return logger.Flush()
}
// guardianTelemetryEncoder is a wrapper around zapcore.jsonEncoder that logs to google cloud logging
type guardianTelemetryEncoder struct {
zapcore.Encoder // zapcore.jsonEncoder
logger *logging.Logger // Google Cloud logger
labels map[string]string // labels to add to each cloud log
zapcore.Encoder // zapcore.jsonEncoder
logger ExternalLogger
skipPrivateLogs bool
}
@ -41,23 +66,6 @@ var logLevelSeverity = map[zapcore.Level]logging.Severity{
}
func (enc *guardianTelemetryEncoder) EncodeEntry(entry zapcore.Entry, fields []zapcore.Field) (*buffer.Buffer, error) {
// if skipPrivateLogs==true, then private logs don't go to telemetry
if enc.skipPrivateLogs {
for _, f := range fields {
if f.Type == zapcore.BoolType {
if f.Key == "_privateLogEntry" {
if f.Integer == 1 {
// do not forward to telemetry by short-circuiting to the underlying encoder.
return enc.Encoder.EncodeEntry(entry, fields)
} else {
break
}
}
}
}
}
buf, err := enc.Encoder.EncodeEntry(entry, fields)
if err != nil {
return nil, err
@ -67,16 +75,16 @@ func (enc *guardianTelemetryEncoder) EncodeEntry(entry zapcore.Entry, fields []z
bufCopy := make([]byte, len(buf.Bytes()))
copy(bufCopy, buf.Bytes())
// Convert the zapcore.Level to a logging.Severity
severity := logLevelSeverity[entry.Level]
// if skipPrivateLogs==true, then private logs don't go to telemetry
if enc.skipPrivateLogs {
if bytes.Contains(bufCopy, []byte("\"_privateLogEntry\":true")) {
// early return because this is a private entry and it should not go to telemetry
return buf, nil
}
}
// Write raw message to log
enc.logger.Log(logging.Entry{
Timestamp: entry.Time,
Payload: json.RawMessage(bufCopy),
Severity: severity,
Labels: enc.labels,
})
// Write raw message to telemetry logger
enc.logger.log(entry.Time, json.RawMessage(bufCopy), entry.Level)
return buf, nil
}
@ -85,12 +93,23 @@ func (enc *guardianTelemetryEncoder) EncodeEntry(entry zapcore.Entry, fields []z
// Without this implementation, a guardianTelemetryEncoder could get silently converted into the underlying zapcore.Encoder at some point, leading to missing telemetry logs.
func (enc *guardianTelemetryEncoder) Clone() zapcore.Encoder {
return &guardianTelemetryEncoder{
Encoder: enc.Encoder.Clone(),
labels: enc.labels,
Encoder: enc.Encoder.Clone(),
logger: enc.logger,
skipPrivateLogs: enc.skipPrivateLogs,
}
}
// New creates a new Telemetry logger.
func NewExternalLogger(skipPrivateLogs bool, externalLogger ExternalLogger) (*Telemetry, error) {
return &Telemetry{
encoder: &guardianTelemetryEncoder{
Encoder: zapcore.NewJSONEncoder(zapdriver.NewProductionEncoderConfig()),
logger: externalLogger,
skipPrivateLogs: skipPrivateLogs,
},
}, nil
}
// New creates a new Telemetry logger with Google Cloud Logging
// skipPrivateLogs: if set to `true`, logs with the field zap.Bool("_privateLogEntry", true) will not be logged by telemetry.
func New(ctx context.Context, project string, serviceAccountJSON []byte, skipPrivateLogs bool, labels map[string]string) (*Telemetry, error) {
gc, err := logging.NewClient(ctx, project, option.WithCredentialsJSON(serviceAccountJSON))
@ -103,11 +122,9 @@ func New(ctx context.Context, project string, serviceAccountJSON []byte, skipPri
}
return &Telemetry{
serviceAccountJSON: serviceAccountJSON,
encoder: &guardianTelemetryEncoder{
Encoder: zapcore.NewJSONEncoder(zapdriver.NewProductionEncoderConfig()),
logger: gc.Logger("wormhole"),
labels: labels,
logger: &ExternalLoggerGoogleCloud{Logger: gc.Logger("wormhole"), labels: labels},
skipPrivateLogs: skipPrivateLogs,
},
}, nil
@ -117,7 +134,7 @@ func (s *Telemetry) WrapLogger(logger *zap.Logger) *zap.Logger {
tc := zapcore.NewCore(
s.encoder,
zapcore.AddSync(io.Discard),
zap.InfoLevel,
telemetryLogLevel,
)
return logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
@ -126,5 +143,5 @@ func (s *Telemetry) WrapLogger(logger *zap.Logger) *zap.Logger {
}
func (s *Telemetry) Close() error {
return s.encoder.logger.Flush()
return s.encoder.logger.flush()
}

View File

@ -0,0 +1,94 @@
package telemetry
import (
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// externalLoggerMock doesn't log anything. It can optionally increase an atomic counter `eventCounter` if provided.
type externalLoggerMock struct {
eventCounter *atomic.Int64
}
func (logger *externalLoggerMock) log(time time.Time, message []byte, level zapcore.Level) {
if logger.eventCounter != nil {
logger.eventCounter.Add(1)
}
}
func (logger *externalLoggerMock) flush() error {
return nil
}
func TestTelemetryWithPrivate(t *testing.T) {
// setup
logger, _ := zap.NewDevelopment()
var eventCounter atomic.Int64
var expectedCounter int64 = 0
externalLogger := &externalLoggerMock{eventCounter: &eventCounter}
tm, err := NewExternalLogger(true, externalLogger)
if err != nil {
logger.Fatal("Failed to initialize telemetry", zap.Error(err))
}
defer tm.Close()
logger = tm.WrapLogger(logger)
// test a single private log entry
logger.Log(zap.InfoLevel, "Single private log", zap.Bool("_privateLogEntry", true))
// test a private logger
loggerPrivate := logger.With(zap.Bool("_privateLogEntry", true))
loggerPrivate.Log(zap.InfoLevel, "Private logger message 1")
loggerPrivate.Log(zap.InfoLevel, "Private logger message 2")
assert.Equal(t, expectedCounter, eventCounter.Load())
// test logging in a child logger
logger2 := logger.With(zap.String("child", "logger"))
logger2.Log(zap.InfoLevel, "hi")
expectedCounter++
assert.Equal(t, expectedCounter, eventCounter.Load())
// try to trick logger into not logging to telemetry with user-controlled input
logger.Log(zap.InfoLevel, "can I trick you?", zap.ByteString("user-controlled", []byte("\"_privateLogEntry\":true")))
expectedCounter++
// user-controlled parameter
logger.Log(zap.InfoLevel, "can I trick you?", zap.String("user-controlled", "\"_privateLogEntry\":true"))
expectedCounter++
// user-controlled message
logger.Log(zap.InfoLevel, "\"_privateLogEntry\":true", zap.String("", ""))
expectedCounter++
assert.Equal(t, expectedCounter, eventCounter.Load())
}
func TestTelemetryWithOutPrivate(t *testing.T) {
// setup
logger, _ := zap.NewDevelopment()
var eventCounter atomic.Int64
externalLogger := &externalLoggerMock{eventCounter: &eventCounter}
tm, err := NewExternalLogger(false, externalLogger)
if err != nil {
logger.Fatal("Failed to initialize telemetry", zap.Error(err))
}
defer tm.Close()
logger = tm.WrapLogger(logger)
// test a single private log entry
logger.Log(zap.InfoLevel, "Single private log", zap.Bool("_privateLogEntry", true))
// test a private logger
loggerPrivate := logger.With(zap.Bool("_privateLogEntry", true))
loggerPrivate.Log(zap.InfoLevel, "Private logger message 1")
loggerPrivate.Log(zap.InfoLevel, "Private logger message 2")
assert.Equal(t, int64(3), eventCounter.Load())
// test logging in a child logger
logger2 := logger.With(zap.String("child", "logger"))
logger2.Log(zap.InfoLevel, "hi")
assert.Equal(t, int64(4), eventCounter.Load())
}