Fly backfiller (#106)

* change repo to support backfill of txhash

* generic backfiller for vaas and txhash

* update deps

* remove unused code

---------

Co-authored-by: gipsh <gipsh@gmail.com>
This commit is contained in:
gipsh 2023-01-30 10:20:38 -03:00 committed by GitHub
parent ca6710d5d5
commit 2db8f58092
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 387 additions and 11 deletions

View File

@ -0,0 +1,31 @@
# generic backfiller
reads a bulk csv dump of stuff (VAAs, TxHash, etc) and upsert it into mongodb
## compile
```bash
go build
```
## run
```bash
./backfiller -strategy vaa -file test.csv
```
When you run the backfiller must pass a valid strategy
Current supported strategies are:
- `vaa` for backfilling VAAs
- `txhash` for backfilling of txHash
## config
The mongodb uri is set via env using the variable `MONGODB_URI`
If is not set it will use the default `mongodb://localhost:27017/`

145
fly/cmd/backfiller/main.go Normal file
View File

@ -0,0 +1,145 @@
package main
import (
"bufio"
"bytes"
"context"
"flag"
"fmt"
"io"
"log"
"os"
"time"
"github.com/briandowns/spinner"
"github.com/schollz/progressbar/v3"
)
type Backfiller struct {
Filename string
Strategy string
Workpool *Workpool
}
func (b *Backfiller) Run() error {
f, err := os.Open(b.Filename)
if err != nil {
return err
}
s := spinner.New(spinner.CharSets[14], 100*time.Millisecond)
s.Color("red")
s.Suffix = fmt.Sprintf(" counting lines")
s.Start()
pLines, err := b.countLines()
if err != nil {
return err
}
s.Stop()
fmt.Printf("lines: %d \n ", pLines)
b.Workpool.Bar = progressbar.Default(int64(pLines))
counter := 0
defer f.Close()
r := bufio.NewReader(f)
// read file line by line and send to workpool
for {
line, _, err := r.ReadLine() //loading chunk into buffer
if err != nil {
if err == io.EOF {
break
}
log.Fatalf("a real error happened here: %v\n", err)
}
b.Workpool.Queue <- string(line)
counter += 1
}
// send exit signal to all workers
for i := 0; i < b.Workpool.Workers; i++ {
b.Workpool.Queue <- "exit"
}
// wait for all workers to finish
b.Workpool.WG.Wait()
fmt.Printf("processed %d lines\n", counter)
return nil
}
func (b *Backfiller) countLines() (int, error) {
file, _ := os.Open(b.Filename)
defer file.Close()
buf := make([]byte, 32*1024)
count := 0
lineSep := []byte{'\n'}
for {
c, err := file.Read(buf)
count += bytes.Count(buf[:c], lineSep)
switch {
case err == io.EOF:
return count, nil
case err != nil:
return count, err
}
}
}
func main() {
var filename string
var strategy string
flag.StringVar(&filename, "file", "", "file to process (mandatory)")
flag.StringVar(&strategy, "strategy", "vaa", "strategy to use (vaa|txhash)")
flag.Parse()
if os.Getenv("MONGODB_URI") == "" {
os.Setenv("MONGODB_URI", "mongodb://localhost:27017/")
fmt.Println("MONGODB_URI not set, using default")
}
if filename == "" {
flag.Usage()
os.Exit(1)
}
ctx := context.Background()
// choose strategy
var worker GenericWorker
switch strategy {
case "vaa":
worker = workerVaa
case "txhash":
worker = workerTxHash
default:
flag.Usage()
os.Exit(1)
}
wp := NewWorkpool(ctx, 100, worker)
b := Backfiller{
Filename: filename,
Workpool: wp,
}
err := b.Run()
if err != nil {
fmt.Println(err)
}
fmt.Println("done!")
}

View File

@ -0,0 +1,42 @@
package main
import (
"context"
"fmt"
"strconv"
"strings"
"time"
"github.com/wormhole-foundation/wormhole-explorer/fly/storage"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
func workerTxHash(ctx context.Context, repo *storage.Repository, line string) error {
tokens := strings.Split(line, ",")
if len(tokens) != 4 {
return fmt.Errorf("invalid line: %s", line)
}
intChainID, err := strconv.ParseInt(tokens[0], 10, 64)
if err != nil {
return fmt.Errorf("error parsing chain id: %v\n", err)
}
now := time.Now()
vaaTxHash := storage.VaaIdTxHashUpdate{
ChainID: vaa.ChainID(intChainID),
Emitter: tokens[1],
Sequence: tokens[2],
TxHash: tokens[3],
UpdatedAt: &now,
}
err = repo.UpsertTxHash(ctx, vaaTxHash)
if err != nil {
return fmt.Errorf("error upserting vaa: %v\n", err)
}
return nil
}

View File

@ -0,0 +1,38 @@
package main
import (
"context"
"encoding/hex"
"fmt"
"strings"
"github.com/wormhole-foundation/wormhole-explorer/fly/storage"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
func workerVaa(ctx context.Context, repo *storage.Repository, line string) error {
tokens := strings.Split(line, ",")
//fmt.Printf("bcid %s, emmiter %s, seq %s\n", header[0], header[1], header[2])
if len(tokens) != 2 {
//fmt.Printf("invalid line: %s", line)
return fmt.Errorf("invalid line: %s", line)
}
data, err := hex.DecodeString(tokens[1])
if err != nil {
return fmt.Errorf("error decoding: %v", err)
}
v, err := vaa.Unmarshal(data)
if err != nil {
return fmt.Errorf("error unmarshaling vaa: %v", err)
}
err = repo.UpsertVaa(ctx, v, data)
if err != nil {
return fmt.Errorf("error upserting vaa: %v\n", err)
}
return nil
}

View File

@ -0,0 +1,75 @@
package main
import (
"context"
"fmt"
"os"
"sync"
"github.com/schollz/progressbar/v3"
"github.com/wormhole-foundation/wormhole-explorer/fly/storage"
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/zap"
)
type GenericWorker func(ctx context.Context, repo *storage.Repository, item string) error
type Workpool struct {
Workers int
Queue chan string
WG sync.WaitGroup
DB *mongo.Database
Log *zap.Logger
Bar *progressbar.ProgressBar
WorkerFunc GenericWorker
}
func NewWorkpool(ctx context.Context, workers int, workerFunc GenericWorker) *Workpool {
wp := Workpool{
Workers: workers,
Queue: make(chan string, workers*1000),
WG: sync.WaitGroup{},
Log: zap.NewExample(),
WorkerFunc: workerFunc,
}
db, err := storage.GetDB(ctx, wp.Log, os.Getenv("MONGODB_URI"), "wormhole")
if err != nil {
panic(err)
}
wp.DB = db
for i := 0; i < workers; i++ {
go wp.Process(ctx)
}
wp.WG.Add(workers)
return &wp
}
func (w *Workpool) Process(ctx context.Context) error {
repo := storage.NewRepository(w.DB, w.Log)
var err error
for {
select {
case line := <-w.Queue:
if line == "exit" {
w.WG.Done()
return nil
}
err = w.WorkerFunc(ctx, repo, line)
if err != nil {
fmt.Println(err)
break
}
w.Bar.Add(1) // its safe to call Add concurrently
}
}
}

View File

@ -37,6 +37,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bgentry/speakeasy v0.1.0 // indirect
github.com/bradfitz/gomemcache v0.0.0-20221031212613-62deef7fc822 // indirect
github.com/briandowns/spinner v1.20.0
github.com/btcsuite/btcd v0.22.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce // indirect
@ -148,7 +149,7 @@ require (
github.com/marten-seemann/qtls-go1-19 v0.1.0 // indirect
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/mattn/go-pointer v0.0.1 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
@ -192,7 +193,7 @@ require (
github.com/raulk/go-watchdog v1.3.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/regen-network/cosmos-proto v0.3.1 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/rivo/uniseg v0.4.3 // indirect
github.com/rs/cors v1.8.2 // indirect
github.com/rs/zerolog v1.27.0 // indirect
github.com/sasha-s/go-deadlock v0.3.1 // indirect
@ -234,8 +235,8 @@ require (
golang.org/x/mod v0.6.0 // indirect
golang.org/x/net v0.2.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.2.0 // indirect
golang.org/x/term v0.2.0 // indirect
golang.org/x/sys v0.4.0 // indirect
golang.org/x/term v0.4.0 // indirect
golang.org/x/text v0.4.0 // indirect
golang.org/x/tools v0.2.0 // indirect
google.golang.org/genproto v0.0.0-20221114212237-e4508ebdbee1 // indirect
@ -251,6 +252,16 @@ require (
nhooyr.io/websocket v1.8.7 // indirect
)
require (
github.com/fatih/color v1.13.0 // indirect
github.com/schollz/progressbar/v3 v3.13.0
)
require (
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect
github.com/schollz/progressbar v1.0.0
)
// Needed for cosmos-sdk based chains. See
// https://github.com/cosmos/cosmos-sdk/issues/10925 for more details.
replace github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1

View File

@ -681,6 +681,8 @@ github.com/bradfitz/gomemcache v0.0.0-20221031212613-62deef7fc822 h1:hjXJeBcAMS1
github.com/bradfitz/gomemcache v0.0.0-20221031212613-62deef7fc822/go.mod h1:H0wQNHz2YrLsuXOZozoeDmnHXkNCRmMW0gwFWDfEZDA=
github.com/breml/bidichk v0.2.3/go.mod h1:8u2C6DnAy0g2cEq+k/A2+tr9O1s+vHGxWn0LTc70T2A=
github.com/breml/errchkjson v0.3.0/go.mod h1:9Cogkyv9gcT8HREpzi3TiqBxCqDzo8awa92zSDFcofU=
github.com/briandowns/spinner v1.20.0 h1:GQq1Yf1KyzYT8CY19GzWrDKP6hYOFB6J72Ks7d8aO1U=
github.com/briandowns/spinner v1.20.0/go.mod h1:TcwZHb7Wb6vn/+bcVv1UXEzaA4pLS7yznHlkY/HzH44=
github.com/bshuster-repo/logrus-logstash-hook v0.4.1/go.mod h1:zsTqEiSzDgAa/8GZR7E1qaXrhYNDKBYy5/dWPTIflbk=
github.com/btcsuite/btcd v0.0.0-20171128150713-2e60448ffcc6/go.mod h1:Dmm/EzmjnCiweXmzRIAiUWCInVmPgjkzgv5k4tVyXiQ=
github.com/btcsuite/btcd v0.0.0-20190115013929-ed77733ec07d/go.mod h1:d3C0AkH6BRcvO8T0UEPu53cnw4IbV63x1bEjildYhO0=
@ -1143,6 +1145,7 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
@ -1807,6 +1810,7 @@ github.com/julz/importas v0.1.0/go.mod h1:oSFU2R4XK/P7kNBrnL/FEQlDGN1/6WoxXEjSSX
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
github.com/jwilder/encoding v0.0.0-20170811194829-b4e1701a28ef/go.mod h1:Ct9fl0F6iIOGgxJ5npU/IUOhOhqlVrGjyIZc8/MagT0=
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k=
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw=
github.com/karalabe/usb v0.0.0-20190919080040-51dc0efba356/go.mod h1:Od972xHfMJowv7NGVDiWVxk2zxnWgjLlJzE+F4F7AGU=
github.com/karalabe/usb v0.0.2/go.mod h1:Od972xHfMJowv7NGVDiWVxk2zxnWgjLlJzE+F4F7AGU=
github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d/go.mod h1:JJNrCn9otv/2QP4D7SMJBgaleKpOf66PnW6F5WGNRIc=
@ -2003,6 +2007,8 @@ github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng=
github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-pointer v0.0.1 h1:n+XhsuGeVO6MEAp7xyEukFINEa+Quek5psIR/ylA6o0=
github.com/mattn/go-pointer v0.0.1/go.mod h1:2zXcozF6qYGgmsG+SeTZz3oAbFLdD3OWqnUbNvJZAlc=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
@ -2063,6 +2069,8 @@ github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy
github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible/go.mod h1:8AuVvqP/mXw1px98n46wfvcGfQ4ci2FwoAjKYxuo3Z4=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
github.com/mitchellh/cli v1.1.0/go.mod h1:xcISNoH86gajksDmfB23e/pu+B+GeFRMYmoHXxx3xhI=
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ=
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw=
github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw=
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
@ -2489,6 +2497,8 @@ github.com/remyoudompheng/go-misc v0.0.0-20190427085024-2d6ac652a50e/go.mod h1:8
github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.3 h1:utMvzDsuh3suAEnhH0RdHmoPbU648o6CvXxTx4SBMOw=
github.com/rivo/uniseg v0.4.3/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/rjeczalik/notify v0.9.1 h1:CLCKso/QK1snAlnhNR/CNvNiFU2saUtjV0bx3EwNeCE=
github.com/rjeczalik/notify v0.9.1/go.mod h1:rKwnCoCGeuQnwBtTSPL9Dad03Vh2n40ePRrjvIXnJho=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
@ -2534,6 +2544,10 @@ github.com/sasha-s/go-deadlock v0.3.1/go.mod h1:F73l+cr82YSh10GxyRI6qZiCgK64VaZj
github.com/sashamelentyev/usestdlibvars v1.8.0/go.mod h1:BFt7b5mSVHaaa26ZupiNRV2ODViQBxZZVhtAxAJRrjs=
github.com/sassoftware/go-rpmutils v0.0.0-20190420191620-a8f1baeba37b/go.mod h1:am+Fp8Bt506lA3Rk3QCmSqmYmLMnPDhdDUcosQCAx+I=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/schollz/progressbar v1.0.0 h1:gbyFReLHDkZo8mxy/dLWMr+Mpb1MokGJ1FqCiqacjZM=
github.com/schollz/progressbar v1.0.0/go.mod h1:/l9I7PC3L3erOuz54ghIRKUEFcosiWfLvJv+Eq26UMs=
github.com/schollz/progressbar/v3 v3.13.0 h1:9TeeWRcjW2qd05I8Kf9knPkW4vLM/hYoa6z9ABvxje8=
github.com/schollz/progressbar/v3 v3.13.0/go.mod h1:ZBYnSuLAX2LU8P8UiKN/KgF2DY58AJC8yfVYLPC8Ly4=
github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw=
github.com/sclevine/spec v1.2.0/go.mod h1:W4J29eT/Kzv7/b9IWLB055Z+qvVC9vt0Arko24q7p+U=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
@ -3482,6 +3496,8 @@ golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.2.0 h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18=
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20201210144234-2321bbc49cbf/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
@ -3494,6 +3510,8 @@ golang.org/x/term v0.0.0-20220722155259-a9ba230a4035/go.mod h1:jbD1KX2456YbFQfuX
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.2.0 h1:z85xZCsEl7bi/KwbNADeBYoOP0++7W1ipu+aGnpwzRM=
golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc=
golang.org/x/term v0.4.0 h1:O7UWfv5+A2qiuulQk30kVinPoMtoIPeVaKLEgLpVkvg=
golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ=
golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

View File

@ -97,6 +97,7 @@ func (s *Repository) UpsertVaa(ctx context.Context, v *vaa.VAA, serializedVaa []
}
func (s *Repository) UpsertObservation(o *gossipv1.SignedObservation) error {
ctx := context.TODO()
vaaID := strings.Split(o.MessageId, "/")
chainIDStr, emitter, sequenceStr := vaaID[0], vaaID[1], vaaID[2]
id := fmt.Sprintf("%s/%s/%s", o.MessageId, hex.EncodeToString(o.Addr), hex.EncodeToString(o.Hash))
@ -136,7 +137,7 @@ func (s *Repository) UpsertObservation(o *gossipv1.SignedObservation) error {
"$setOnInsert": indexedAt(now),
}
opts := options.Update().SetUpsert(true)
_, err = s.collections.observations.UpdateByID(context.TODO(), id, update, opts)
_, err = s.collections.observations.UpdateByID(ctx, id, update, opts)
if err != nil {
s.log.Error("Error inserting observation", zap.Error(err))
return err
@ -150,18 +151,33 @@ func (s *Repository) UpsertObservation(o *gossipv1.SignedObservation) error {
UpdatedAt: &now,
}
updateVaaTxHash := bson.M{
"$set": vaaTxHash,
"$setOnInsert": indexedAt(now),
"$inc": bson.D{{Key: "revision", Value: 1}},
}
_, err = s.collections.vaaIdTxHash.UpdateByID(context.TODO(), o.MessageId, updateVaaTxHash, opts)
err = s.UpsertTxHash(ctx, vaaTxHash)
if err != nil {
s.log.Error("Error inserting vaaIdTxHash", zap.Error(err))
return err
}
return err
}
func (s *Repository) UpsertTxHash(ctx context.Context, vaaTxHash VaaIdTxHashUpdate) error {
id := fmt.Sprintf("%d/%s/%s", vaaTxHash.ChainID, vaaTxHash.Emitter, vaaTxHash.Sequence)
updateVaaTxHash := bson.M{
"$set": vaaTxHash,
"$setOnInsert": indexedAt(time.Now()),
"$inc": bson.D{{Key: "revision", Value: 1}},
}
_, err := s.collections.vaaIdTxHash.UpdateByID(ctx, id, updateVaaTxHash, options.Update().SetUpsert(true))
if err != nil {
s.log.Error("Error inserting vaaIdTxHash", zap.Error(err))
return err
}
return err
}
func (s *Repository) UpsertHeartbeat(hb *gossipv1.Heartbeat) error {