diff --git a/node/pkg/telemetry/telemetry.go b/node/pkg/telemetry/telemetry.go index feb8fbc94..68158d892 100644 --- a/node/pkg/telemetry/telemetry.go +++ b/node/pkg/telemetry/telemetry.go @@ -1,10 +1,12 @@ package telemetry import ( + "bytes" "context" "encoding/json" "fmt" "io" + "time" "cloud.google.com/go/logging" "github.com/blendle/zapdriver" @@ -14,16 +16,39 @@ import ( "google.golang.org/api/option" ) +const telemetryLogLevel = zap.InfoLevel + type Telemetry struct { - encoder *guardianTelemetryEncoder - serviceAccountJSON []byte + encoder *guardianTelemetryEncoder +} + +type ExternalLogger interface { + log(time time.Time, message []byte, level zapcore.Level) + flush() error +} + +type ExternalLoggerGoogleCloud struct { + *logging.Logger + labels map[string]string // labels to add to each cloud log +} + +func (logger *ExternalLoggerGoogleCloud) log(time time.Time, message []byte, level zapcore.Level) { + logger.Log(logging.Entry{ + Timestamp: time, + Payload: message, + Severity: logLevelSeverity[level], + Labels: logger.labels, + }) +} + +func (logger *ExternalLoggerGoogleCloud) flush() error { + return logger.Flush() } // guardianTelemetryEncoder is a wrapper around zapcore.jsonEncoder that logs to google cloud logging type guardianTelemetryEncoder struct { - zapcore.Encoder // zapcore.jsonEncoder - logger *logging.Logger // Google Cloud logger - labels map[string]string // labels to add to each cloud log + zapcore.Encoder // zapcore.jsonEncoder + logger ExternalLogger skipPrivateLogs bool } @@ -41,23 +66,6 @@ var logLevelSeverity = map[zapcore.Level]logging.Severity{ } func (enc *guardianTelemetryEncoder) EncodeEntry(entry zapcore.Entry, fields []zapcore.Field) (*buffer.Buffer, error) { - - // if skipPrivateLogs==true, then private logs don't go to telemetry - if enc.skipPrivateLogs { - for _, f := range fields { - if f.Type == zapcore.BoolType { - if f.Key == "_privateLogEntry" { - if f.Integer == 1 { - // do not forward to telemetry by short-circuiting to the underlying encoder. - return enc.Encoder.EncodeEntry(entry, fields) - } else { - break - } - } - } - } - } - buf, err := enc.Encoder.EncodeEntry(entry, fields) if err != nil { return nil, err @@ -67,16 +75,16 @@ func (enc *guardianTelemetryEncoder) EncodeEntry(entry zapcore.Entry, fields []z bufCopy := make([]byte, len(buf.Bytes())) copy(bufCopy, buf.Bytes()) - // Convert the zapcore.Level to a logging.Severity - severity := logLevelSeverity[entry.Level] + // if skipPrivateLogs==true, then private logs don't go to telemetry + if enc.skipPrivateLogs { + if bytes.Contains(bufCopy, []byte("\"_privateLogEntry\":true")) { + // early return because this is a private entry and it should not go to telemetry + return buf, nil + } + } - // Write raw message to log - enc.logger.Log(logging.Entry{ - Timestamp: entry.Time, - Payload: json.RawMessage(bufCopy), - Severity: severity, - Labels: enc.labels, - }) + // Write raw message to telemetry logger + enc.logger.log(entry.Time, json.RawMessage(bufCopy), entry.Level) return buf, nil } @@ -85,12 +93,23 @@ func (enc *guardianTelemetryEncoder) EncodeEntry(entry zapcore.Entry, fields []z // Without this implementation, a guardianTelemetryEncoder could get silently converted into the underlying zapcore.Encoder at some point, leading to missing telemetry logs. func (enc *guardianTelemetryEncoder) Clone() zapcore.Encoder { return &guardianTelemetryEncoder{ - Encoder: enc.Encoder.Clone(), - labels: enc.labels, + Encoder: enc.Encoder.Clone(), + logger: enc.logger, + skipPrivateLogs: enc.skipPrivateLogs, } } -// New creates a new Telemetry logger. +func NewExternalLogger(skipPrivateLogs bool, externalLogger ExternalLogger) (*Telemetry, error) { + return &Telemetry{ + encoder: &guardianTelemetryEncoder{ + Encoder: zapcore.NewJSONEncoder(zapdriver.NewProductionEncoderConfig()), + logger: externalLogger, + skipPrivateLogs: skipPrivateLogs, + }, + }, nil +} + +// New creates a new Telemetry logger with Google Cloud Logging // skipPrivateLogs: if set to `true`, logs with the field zap.Bool("_privateLogEntry", true) will not be logged by telemetry. func New(ctx context.Context, project string, serviceAccountJSON []byte, skipPrivateLogs bool, labels map[string]string) (*Telemetry, error) { gc, err := logging.NewClient(ctx, project, option.WithCredentialsJSON(serviceAccountJSON)) @@ -103,11 +122,9 @@ func New(ctx context.Context, project string, serviceAccountJSON []byte, skipPri } return &Telemetry{ - serviceAccountJSON: serviceAccountJSON, encoder: &guardianTelemetryEncoder{ Encoder: zapcore.NewJSONEncoder(zapdriver.NewProductionEncoderConfig()), - logger: gc.Logger("wormhole"), - labels: labels, + logger: &ExternalLoggerGoogleCloud{Logger: gc.Logger("wormhole"), labels: labels}, skipPrivateLogs: skipPrivateLogs, }, }, nil @@ -117,7 +134,7 @@ func (s *Telemetry) WrapLogger(logger *zap.Logger) *zap.Logger { tc := zapcore.NewCore( s.encoder, zapcore.AddSync(io.Discard), - zap.InfoLevel, + telemetryLogLevel, ) return logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core { @@ -126,5 +143,5 @@ func (s *Telemetry) WrapLogger(logger *zap.Logger) *zap.Logger { } func (s *Telemetry) Close() error { - return s.encoder.logger.Flush() + return s.encoder.logger.flush() } diff --git a/node/pkg/telemetry/telemetry_test.go b/node/pkg/telemetry/telemetry_test.go new file mode 100644 index 000000000..4da00b92e --- /dev/null +++ b/node/pkg/telemetry/telemetry_test.go @@ -0,0 +1,94 @@ +package telemetry + +import ( + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// externalLoggerMock doesn't log anything. It can optionally increase an atomic counter `eventCounter` if provided. +type externalLoggerMock struct { + eventCounter *atomic.Int64 +} + +func (logger *externalLoggerMock) log(time time.Time, message []byte, level zapcore.Level) { + if logger.eventCounter != nil { + logger.eventCounter.Add(1) + } +} +func (logger *externalLoggerMock) flush() error { + return nil +} + +func TestTelemetryWithPrivate(t *testing.T) { + // setup + logger, _ := zap.NewDevelopment() + var eventCounter atomic.Int64 + var expectedCounter int64 = 0 + + externalLogger := &externalLoggerMock{eventCounter: &eventCounter} + tm, err := NewExternalLogger(true, externalLogger) + if err != nil { + logger.Fatal("Failed to initialize telemetry", zap.Error(err)) + } + defer tm.Close() + logger = tm.WrapLogger(logger) + + // test a single private log entry + logger.Log(zap.InfoLevel, "Single private log", zap.Bool("_privateLogEntry", true)) + + // test a private logger + loggerPrivate := logger.With(zap.Bool("_privateLogEntry", true)) + loggerPrivate.Log(zap.InfoLevel, "Private logger message 1") + loggerPrivate.Log(zap.InfoLevel, "Private logger message 2") + assert.Equal(t, expectedCounter, eventCounter.Load()) + + // test logging in a child logger + logger2 := logger.With(zap.String("child", "logger")) + logger2.Log(zap.InfoLevel, "hi") + expectedCounter++ + assert.Equal(t, expectedCounter, eventCounter.Load()) + + // try to trick logger into not logging to telemetry with user-controlled input + logger.Log(zap.InfoLevel, "can I trick you?", zap.ByteString("user-controlled", []byte("\"_privateLogEntry\":true"))) + expectedCounter++ + // user-controlled parameter + logger.Log(zap.InfoLevel, "can I trick you?", zap.String("user-controlled", "\"_privateLogEntry\":true")) + expectedCounter++ + // user-controlled message + logger.Log(zap.InfoLevel, "\"_privateLogEntry\":true", zap.String("", "")) + expectedCounter++ + assert.Equal(t, expectedCounter, eventCounter.Load()) +} + +func TestTelemetryWithOutPrivate(t *testing.T) { + // setup + logger, _ := zap.NewDevelopment() + var eventCounter atomic.Int64 + + externalLogger := &externalLoggerMock{eventCounter: &eventCounter} + tm, err := NewExternalLogger(false, externalLogger) + if err != nil { + logger.Fatal("Failed to initialize telemetry", zap.Error(err)) + } + defer tm.Close() + logger = tm.WrapLogger(logger) + + // test a single private log entry + logger.Log(zap.InfoLevel, "Single private log", zap.Bool("_privateLogEntry", true)) + + // test a private logger + loggerPrivate := logger.With(zap.Bool("_privateLogEntry", true)) + loggerPrivate.Log(zap.InfoLevel, "Private logger message 1") + loggerPrivate.Log(zap.InfoLevel, "Private logger message 2") + assert.Equal(t, int64(3), eventCounter.Load()) + + // test logging in a child logger + logger2 := logger.With(zap.String("child", "logger")) + logger2.Log(zap.InfoLevel, "hi") + assert.Equal(t, int64(4), eventCounter.Load()) +}