wormhole-explorer/fly/cmd/backfiller/backfiler.go

113 lines
1.8 KiB
Go

package main
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"log"
"os"
"time"
"github.com/briandowns/spinner"
"github.com/schollz/progressbar/v3"
)
func RunBackfiller(cfg WorkerConfiguration, workerFunc GenericWorker) {
ctx := context.Background()
wp := NewWorkpool(ctx, cfg, workerFunc)
b := Backfiller{
Filename: cfg.Filename,
Workpool: wp,
}
err := b.Run()
if err != nil {
fmt.Println(err)
}
fmt.Println("done backfiller!")
}
type Backfiller struct {
Filename string
Strategy string
Workpool *Workpool
}
func (b *Backfiller) Run() error {
f, err := os.Open(b.Filename)
if err != nil {
return err
}
s := spinner.New(spinner.CharSets[14], 100*time.Millisecond)
s.Color("red")
s.Suffix = fmt.Sprintf(" counting lines")
s.Start()
pLines, err := b.countLines()
if err != nil {
return err
}
s.Stop()
fmt.Printf("lines: %d \n ", pLines)
b.Workpool.Bar = progressbar.Default(int64(pLines))
counter := 0
defer f.Close()
r := bufio.NewReader(f)
// read file line by line and send to workpool
for {
line, _, err := r.ReadLine() //loading chunk into buffer
if err != nil {
if err == io.EOF {
break
}
log.Fatalf("a real error happened here: %v\n", err)
}
b.Workpool.Queue <- string(line)
counter += 1
}
// send exit signal to all workers
for i := 0; i < b.Workpool.Workers; i++ {
b.Workpool.Queue <- "exit"
}
// wait for all workers to finish
b.Workpool.WG.Wait()
fmt.Printf("processed %d lines\n", counter)
return nil
}
func (b *Backfiller) countLines() (int, error) {
file, _ := os.Open(b.Filename)
defer file.Close()
buf := make([]byte, 32*1024)
count := 0
lineSep := []byte{'\n'}
for {
c, err := file.Read(buf)
count += bytes.Count(buf[:c], lineSep)
switch {
case err == io.EOF:
return count, nil
case err != nil:
return count, err
}
}
}