From d73bce429ac31df892c0a4879b011890a92ef246 Mon Sep 17 00:00:00 2001 From: tbjump Date: Wed, 5 Jul 2023 23:17:23 +0000 Subject: [PATCH] node/node_test: count amount of generated logs --- node/pkg/db/open.go | 2 +- node/pkg/node/logcounter.go | 42 +++++++++++++++++++++++++++++++++++++ node/pkg/node/node_test.go | 27 ++++++++++++++++-------- 3 files changed, 61 insertions(+), 10 deletions(-) create mode 100644 node/pkg/node/logcounter.go diff --git a/node/pkg/db/open.go b/node/pkg/db/open.go index f7adfd997..513258db2 100644 --- a/node/pkg/db/open.go +++ b/node/pkg/db/open.go @@ -30,7 +30,7 @@ func (l badgerZapLogger) Debugf(f string, v ...interface{}) { } func OpenDb(logger *zap.Logger, dataDir *string) *Database { - options := badger.DefaultOptions(dbPath) + var options badger.Options if dataDir != nil { dbPath := path.Join(*dataDir, "db") diff --git a/node/pkg/node/logcounter.go b/node/pkg/node/logcounter.go new file mode 100644 index 000000000..b3759a3db --- /dev/null +++ b/node/pkg/node/logcounter.go @@ -0,0 +1,42 @@ +package node + +import ( + "sync/atomic" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +type LogSizeCounter struct { + level zapcore.Level + ctr atomic.Uint64 +} + +func NewLogSizeCounter(lvl zapcore.Level) *LogSizeCounter { + return &LogSizeCounter{ + level: lvl, + } +} + +func (lc *LogSizeCounter) Reset() uint64 { + n := lc.ctr.Load() + lc.ctr.Store(0) + return n +} + +func (lc *LogSizeCounter) Sync() error { return nil } + +func (lc *LogSizeCounter) Write(p []byte) (n int, err error) { + n = len(p) + lc.ctr.Add(uint64(n)) + return n, nil +} + +func (lc *LogSizeCounter) Core() zapcore.Core { + var output zapcore.WriteSyncer = lc + encoder := zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()) + priority := zap.LevelEnablerFunc(func(lvl zapcore.Level) bool { + return lvl >= lc.level + }) + return zapcore.NewCore(encoder, output, priority) +} diff --git a/node/pkg/node/node_test.go b/node/pkg/node/node_test.go index 5290d5ed6..1798a671e 100644 --- a/node/pkg/node/node_test.go +++ b/node/pkg/node/node_test.go @@ -175,6 +175,9 @@ func mockGuardianRunnable(testId uint, gs []*mockGuardian, mockGuardianIndex uin adminSocketPath := mockAdminStocket(testId, mockGuardianIndex) rpcMap := make(map[string]string) + // We set this to None because we don't want to count these logs when counting the amount of logs generated per message + publicRpcLogDetail := common.GrpcLogDetailNone + // assemble all the options guardianOptions := []*GuardianOption{ GuardianOptionDatabase(db), @@ -182,8 +185,8 @@ func mockGuardianRunnable(testId uint, gs []*mockGuardian, mockGuardianIndex uin GuardianOptionNoAccountant(), // disable accountant GuardianOptionGovernor(true), GuardianOptionP2P(gs[mockGuardianIndex].p2pKey, networkID, bootstrapPeers, nodeName, false, p2pPort, func() string { return "" }), - GuardianOptionPublicRpcSocket(publicSocketPath, common.GrpcLogDetailFull), - GuardianOptionPublicrpcTcpService(publicRpc, common.GrpcLogDetailFull), + GuardianOptionPublicRpcSocket(publicSocketPath, publicRpcLogDetail), + GuardianOptionPublicrpcTcpService(publicRpc, publicRpcLogDetail), GuardianOptionPublicWeb(mockPublicWeb(testId, mockGuardianIndex), publicSocketPath, "", false, ""), GuardianOptionAdminService(adminSocketPath, nil, nil, rpcMap), GuardianOptionStatusServer(fmt.Sprintf("[::]:%d", mockStatusPort(testId, mockGuardianIndex))), @@ -208,11 +211,12 @@ func mockGuardianRunnable(testId uint, gs []*mockGuardian, mockGuardianIndex uin } // setupLogsCapture is a helper function for making a zap logger/observer combination for testing that certain logs have been made -func setupLogsCapture(options ...zap.Option) (*zap.Logger, *observer.ObservedLogs) { +func setupLogsCapture(options ...zap.Option) (*zap.Logger, *observer.ObservedLogs, *LogSizeCounter) { observedCore, observedLogs := observer.New(zap.DebugLevel) consoleLogger, _ := zap.NewDevelopment(zap.IncreaseLevel(CONSOLE_LOG_LEVEL)) - parentLogger := zap.New(zapcore.NewTee(observedCore, consoleLogger.Core()), options...) - return parentLogger, observedLogs + lc := NewLogSizeCounter(zap.InfoLevel) + parentLogger := zap.New(zapcore.NewTee(observedCore, consoleLogger.Core(), lc.Core()), options...) + return parentLogger, observedLogs, lc } func waitForHeartbeatsInLogs(t testing.TB, zapObserver *observer.ObservedLogs, gs []*mockGuardian) { @@ -574,7 +578,7 @@ func testConsensus(t *testing.T, testCases []testCase, numGuardians int) { rootCtx, rootCtxCancel := context.WithTimeout(context.Background(), testTimeout) defer rootCtxCancel() - zapLogger, zapObserver := setupLogsCapture() + zapLogger, zapObserver, _ := setupLogsCapture() supervisor.New(rootCtx, zapLogger, func(ctx context.Context) error { logger := supervisor.Logger(ctx) @@ -839,7 +843,7 @@ func testGuardianConfigurations(t *testing.T, testCases []testCaseGuardianConfig // The panic will be subsequently caught by the supervisor fatalHook := make(fatalHook) defer close(fatalHook) - zapLogger, zapObserver := setupLogsCapture(zap.WithFatalHook(fatalHook)) + zapLogger, zapObserver, _ := setupLogsCapture(zap.WithFatalHook(fatalHook)) supervisor.New(rootCtx, zapLogger, func(ctx context.Context) error { // Create a sub-context with cancel function that we can pass to G.run. @@ -996,7 +1000,8 @@ func BenchmarkConsensus(b *testing.B) { //CONSOLE_LOG_LEVEL = zap.DebugLevel //CONSOLE_LOG_LEVEL = zap.InfoLevel CONSOLE_LOG_LEVEL = zap.WarnLevel - benchmarkConsensus(b, "1", 19, 1000, 2) // ~28s + //benchmarkConsensus(b, "1", 19, 1000, 2) // ~28s + benchmarkConsensus(b, "1", 19, 100, 1) // ~3s //benchmarkConsensus(b, "1", 19, 100, 2) // ~2s //benchmarkConsensus(b, "1", 19, 100, 3) // sometimes fails, i.e. too much parallelism //benchmarkConsensus(b, "1", 19, 100, 10) // sometimes fails, i.e. too much parallelism @@ -1018,7 +1023,7 @@ func benchmarkConsensus(t *testing.B, name string, numGuardians int, numMessages rootCtx, rootCtxCancel := context.WithTimeout(context.Background(), testTimeout) defer rootCtxCancel() - zapLogger, zapObserver := setupLogsCapture() + zapLogger, zapObserver, setupLogsCapture := setupLogsCapture() supervisor.New(rootCtx, zapLogger, func(ctx context.Context) error { logger := supervisor.Logger(ctx) @@ -1079,6 +1084,7 @@ func benchmarkConsensus(t *testing.B, name string, numGuardians int, numMessages c := publicrpcv1.NewPublicRPCServiceClient(conn) logger.Info("-----------Beginning benchmark-----------") + setupLogsCapture.Reset() t.ResetTimer() // nextObsReadyC ensures that there are not more than `maxPendingObs` observations pending at any given point in time. @@ -1125,6 +1131,9 @@ func benchmarkConsensus(t *testing.B, name string, numGuardians int, numMessages // We're done! logger.Info("Tests completed.") t.StopTimer() + logsize := setupLogsCapture.Reset() + logsize = logsize / uint64(numMessages) / uint64(numGuardians) // normalize + logger.Warn("benchmarkConsensus: logsize report", zap.Uint64("logbytes_per_msg", logsize)) supervisor.Signal(ctx, supervisor.SignalDone) rootCtxCancel() return nil