Start writing vaa_volume_v3 (#1438)

* start writing vaa_volume_v3

* fill with none

* override version tag

* fix filling of appIds tags

* add size tag and bump measurement version

* change script in order to contemplate emitter and destionation chain

* change script for totals

* change grouping

* chaange variavble in script

* fix indent

* split into 2 tasks

* rename

* more changes

* fix scripts

* revert changes
This commit is contained in:
Mariano 2024-05-29 12:26:00 -03:00 committed by GitHub
parent 4c4bf3ee90
commit 1602a74748
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 45 additions and 1 deletions

View File

@ -32,6 +32,7 @@ func IsUnknownTokenErr(err error) bool {
type TransferredToken struct {
AppId string
AppIDs []string
FromChain sdk.ChainID
ToChain sdk.ChainID
TokenAddress sdk.Address
@ -49,6 +50,7 @@ func (t *TransferredToken) Clone() *TransferredToken {
}
return &TransferredToken{
AppId: t.AppId,
AppIDs: t.AppIDs,
FromChain: t.FromChain,
ToChain: t.ToChain,
TokenAddress: t.TokenAddress,
@ -161,6 +163,7 @@ func createToken(p *parser.ParseVaaWithStandarizedPropertiesdResponse, emitterCh
return &TransferredToken{
AppId: appId,
AppIDs: p.StandardizedProperties.AppIds,
FromChain: emitterChain,
ToChain: p.StandardizedProperties.ToChain,
TokenAddress: address,

View File

@ -269,8 +269,10 @@ func (m *Metric) volumeMeasurement(ctx context.Context, params *Params, token *t
return nil
}
vaaVolumeV3point := m.makePointVaaVolumeV3(point, params, token)
// Write the point to influx
err = m.apiBucketInfinite.WritePoint(ctx, point)
err = m.apiBucketInfinite.WritePoint(ctx, point, vaaVolumeV3point)
if err != nil {
m.metrics.IncFailedMeasurement(VaaVolumeMeasurement)
return err
@ -287,6 +289,45 @@ func (m *Metric) volumeMeasurement(ctx context.Context, params *Params, token *t
return nil
}
func (m *Metric) makePointVaaVolumeV3(vaaVolumeV2Point *write.Point, params *Params, transferredToken *token.TransferredToken) *write.Point {
point := influxdb2.NewPointWithMeasurement("vaa_volume_v3")
point.SetTime(vaaVolumeV2Point.Time())
for _, field := range vaaVolumeV2Point.FieldList() {
point.AddField(field.Key, field.Value)
}
for _, tag := range vaaVolumeV2Point.TagList() {
if tag.Key != "app_id" {
point.AddTag(tag.Key, tag.Value)
}
}
point.AddTag("version", "v5")
for i, appID := range transferredToken.AppIDs {
point.AddTag(fmt.Sprintf("app_id_%d", i+1), appID)
}
// fill with none app_id_2/3 depending on the number of appIDs to ensure that all data points contain the 3 tags.
for i := len(transferredToken.AppIDs); i < 3; i++ {
point.AddTag(fmt.Sprintf("app_id_%d", i+1), "none")
}
point.AddTag("size", strconv.Itoa(len(transferredToken.AppIDs)))
if len(transferredToken.AppIDs) > 3 {
m.logger.Warn("Too many appIDs.",
zap.String("vaaId", params.Vaa.MessageID()),
zap.String("trackId", params.TrackID),
zap.String("appIDs", fmt.Sprintf("%v", transferredToken.AppIDs)))
}
return point
}
// MakePointForVaaCount generates a data point for the VAA count measurement.
//
// Some VAAs will not generate a measurement, so the caller must always check