diff --git a/cmd/radiance/blockstore/verifydata/verifydata.go b/cmd/radiance/blockstore/verifydata/verifydata.go index a41590a..ba94fd3 100644 --- a/cmd/radiance/blockstore/verifydata/verifydata.go +++ b/cmd/radiance/blockstore/verifydata/verifydata.go @@ -2,6 +2,7 @@ package verifydata import ( "context" + "io" "os" "runtime" "sync/atomic" @@ -9,7 +10,10 @@ import ( "github.com/certusone/radiance/pkg/blockstore" "github.com/linxGnu/grocksdb" + "github.com/mattn/go-isatty" "github.com/spf13/cobra" + "github.com/vbauerster/mpb/v8" + "github.com/vbauerster/mpb/v8/decor" "golang.org/x/sync/errgroup" "k8s.io/klog/v2" ) @@ -74,32 +78,60 @@ func run(c *cobra.Command, args []string) { // stats trackers var numSuccess atomic.Uint64 + var numSkipped atomic.Uint64 var numFailure atomic.Uint32 // application lifetime - ctx := c.Context() - ctx, cancel := context.WithCancel(ctx) + rootCtx := c.Context() + ctx, cancel := context.WithCancel(rootCtx) defer cancel() group, ctx := errgroup.WithContext(ctx) stats := func() { - klog.Infof("[stats] good=%d bad=%d", numSuccess.Load(), numFailure.Load()) + klog.Infof("[stats] good=%d skipped=%d bad=%d", + numSuccess.Load(), numSkipped.Load(), numFailure.Load()) + } + + var barOutput io.Writer + isAtty := isatty.IsTerminal(os.Stderr.Fd()) + if isAtty { + barOutput = os.Stderr + } else { + barOutput = io.Discard + } + + progress := mpb.NewWithContext(ctx, mpb.WithOutput(barOutput)) + bar := progress.New(int64(total), mpb.BarStyle(), + mpb.PrependDecorators( + decor.Spinner(nil), + decor.CurrentNoUnit(" %d"), + decor.TotalNoUnit(" / %d slots"), + decor.NewPercentage(" (% d)"), + ), + mpb.AppendDecorators( + decor.Name("eta="), + decor.AverageETA(decor.ET_STYLE_GO), + )) + + if isAtty { + klog.LogToStderr(false) + klog.SetOutput(progress) } statInterval := *flagStatIvl if statInterval > 0 { ticker := time.NewTicker(statInterval) - group.Go(func() error { + go func() { defer ticker.Stop() for { select { case <-ctx.Done(): - return nil + return case <-ticker.C: stats() } } - }) + }() } for i := uint(0); i < workers; i++ { @@ -114,9 +146,13 @@ func run(c *cobra.Command, args []string) { break } + klog.Infof("[worker %d]: range=[%d:%d]", i, wLo, wHi) w := &worker{ + id: i, + bar: bar, stop: wHi, numSuccess: &numSuccess, + numSkipped: &numSkipped, numFailures: &numFailure, maxFailures: *flagMaxErrs, } @@ -131,9 +167,12 @@ func run(c *cobra.Command, args []string) { if err := group.Wait(); err != nil { klog.Errorf("Aborting: %s", err) exitCode = 1 - } else { + } else if err = rootCtx.Err(); err == nil { klog.Info("Done!") exitCode = 0 + } else { + klog.Infof("Aborted: %s", err) + exitCode = 1 } stats() diff --git a/cmd/radiance/blockstore/verifydata/worker.go b/cmd/radiance/blockstore/verifydata/worker.go index 0f452ac..f1a5370 100644 --- a/cmd/radiance/blockstore/verifydata/worker.go +++ b/cmd/radiance/blockstore/verifydata/worker.go @@ -4,25 +4,33 @@ import ( "context" "fmt" "sync/atomic" + "time" "github.com/certusone/radiance/pkg/blockstore" "github.com/linxGnu/grocksdb" + "github.com/vbauerster/mpb/v8" "k8s.io/klog/v2" ) // worker does a single pass over blockstore.CfMeta and blockstore.CfDataShred concurrently. type worker struct { + id uint meta *grocksdb.Iterator shred *grocksdb.Iterator // Slot range - stop uint64 + current uint64 + stop uint64 + ts time.Time + bar *mpb.Bar numSuccess *atomic.Uint64 + numSkipped *atomic.Uint64 numFailures *atomic.Uint32 maxFailures uint32 } func (w *worker) init(db *blockstore.DB, start uint64) { + w.current = start w.meta = db.DB.NewIteratorCF(grocksdb.NewDefaultReadOptions(), db.CfMeta) w.shred = db.DB.NewIteratorCF(grocksdb.NewDefaultReadOptions(), db.CfDataShred) slotKey := blockstore.MakeSlotKey(start) @@ -62,9 +70,12 @@ func (w *worker) readSlot() (shouldContinue bool) { // Remember failure and increment failure counter before returning. var metaSlot uint64 success := false + var isFull bool defer func() { + if !isFull { + return + } if success { - klog.V(3).Infof("slot %d: ok", metaSlot) w.numSuccess.Add(1) } else { if w.shouldAbort(w.numFailures.Add(1)) { @@ -80,6 +91,32 @@ func (w *worker) readSlot() (shouldContinue bool) { klog.Warningf("Skipping invalid slot key: %x", w.meta.Key().Data()) return } + if metaSlot >= w.stop { + return false + } + defer func() { + if success { + if isFull { + klog.V(3).Infof("[worker %d]: slot %d: ok", w.id, metaSlot) + } else { + klog.V(3).Infof("[worker %d]: slot %d: skipped", w.id, metaSlot) + } + } + }() + + // Update progress bar + step := metaSlot - w.current + if step == 0 { + step = 1 + } + if metaSlot < w.current { + step = 0 // ??? + } + w.bar.IncrInt64(int64(step)) + w.current = metaSlot + if step > 1 { + w.numSkipped.Add(step - 1) + } // Shred iterator should follow meta iter shredSlot, _, ok := blockstore.ParseShredKey(w.shred.Key().Data()) @@ -112,6 +149,11 @@ func (w *worker) readSlot() (shouldContinue bool) { klog.Warningf("slot %d: invalid meta: %s", metaSlot, err) return } + if isFull = meta.IsFull(); !isFull { + w.numSkipped.Add(1) + success = true + return + } // Read data shreds. shreds, err := blockstore.GetDataShredsFromIter(w.shred, metaSlot, 0, uint32(meta.Received)) diff --git a/go.mod b/go.mod index bc3f477..81dc0d6 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/klauspost/compress v1.15.9 github.com/linxGnu/grocksdb v1.7.7 github.com/lucas-clemente/quic-go v0.27.2 + github.com/mattn/go-isatty v0.0.16 github.com/mr-tron/base58 v1.2.0 github.com/novifinancial/serde-reflection/serde-generate/runtime/golang v0.0.0-20220519162058-e5cd3c3b3f3a github.com/prometheus/client_golang v1.13.0 @@ -23,6 +24,7 @@ require ( github.com/spf13/cobra v1.5.0 github.com/stretchr/testify v1.8.0 github.com/twmb/franz-go v1.6.0 + github.com/vbauerster/mpb/v8 v8.0.2 golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 golang.org/x/exp v0.0.0-20220909182711-5c715a9e8561 golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 @@ -38,6 +40,8 @@ require ( cloud.google.com/go/iam v0.3.0 // indirect contrib.go.opencensus.io/exporter/stackdriver v0.13.13 // indirect filippo.io/edwards25519 v1.0.0 // indirect + github.com/VividCortex/ewma v1.2.0 // indirect + github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59 // indirect github.com/beorn7/perks v1.0.1 // indirect @@ -73,7 +77,7 @@ require ( github.com/marten-seemann/qtls-go1-17 v0.1.2 // indirect github.com/marten-seemann/qtls-go1-18 v0.1.2 // indirect github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.16 // indirect + github.com/mattn/go-runewidth v0.0.13 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect github.com/mdlayher/netlink v1.6.0 // indirect github.com/mdlayher/socket v0.2.3 // indirect @@ -89,6 +93,7 @@ require ( github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect + github.com/rivo/uniseg v0.2.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/streamingfast/logging v0.0.0-20220813175024-b4fbb0e893df // indirect github.com/teris-io/shortid v0.0.0-20220617161101-71ec9f2aa569 // indirect diff --git a/go.sum b/go.sum index de369d0..04808d0 100644 --- a/go.sum +++ b/go.sum @@ -85,6 +85,10 @@ github.com/GeertJohan/go.rice v1.0.0/go.mod h1:eH6gbSOAUv07dQuZVnBmoDP8mgsM1rtix github.com/LiamHaworth/go-tproxy v0.0.0-20190726054950-ef7efd7f24ed h1:eqa6queieK8SvoszxCu0WwH7lSVeL4/N/f1JwOMw1G4= github.com/LiamHaworth/go-tproxy v0.0.0-20190726054950-ef7efd7f24ed/go.mod h1:rA52xkgZwql9LRZXWb2arHEFP6qSR48KY2xOfWzEciQ= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= +github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= +github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8= +github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo= github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -417,6 +421,8 @@ github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU= +github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 h1:I0XW9+e1XWDxdcEniV4rQAIOPUGDq67JSCiRCgGCZLI= github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= @@ -516,6 +522,8 @@ github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo= github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -615,6 +623,8 @@ github.com/twmb/franz-go/pkg/kmsg v1.1.0 h1:csckTxG48q7Tem7ZwMxe2jAb0ehDNglxZccG github.com/twmb/franz-go/pkg/kmsg v1.1.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= +github.com/vbauerster/mpb/v8 v8.0.2 h1:alVQG69Jg5+Ku9Hu1dakDx50uACEHnIzS7i356NQ/Vs= +github.com/vbauerster/mpb/v8 v8.0.2/go.mod h1:Z9VJYIzXls7xZwirZjShGsi+14enzJhQfGyb/XZK0ZQ= github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM= github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f h1:p4VB7kIXpOQvVn1ZaTIVp+3vuYAXFe3OJEvjbUYJLaA=