From 4aca29fe7d3da4aff6dfc78fe6ab09ec76fd44c4 Mon Sep 17 00:00:00 2001 From: agodnic Date: Mon, 29 May 2023 10:54:09 -0300 Subject: [PATCH] Generate volume metrics for unknown tokens (#348) ### Summary This pull request fixes an issue in which volume metrics were not being generated for unknown tokens (i.e.: tokens that are not present in our database). Also, the function that generated volume metrics was modified in order to handle timestamp collisions. --- analytic/consumer/consumer.go | 4 +- analytic/metric/metric.go | 95 +++++++++++-------- analytic/scripts/asset_volumes_24h.flux | 1 - .../scripts/chain_pair_transfers_24h.flux | 1 - api/handlers/transactions/repository.go | 14 +-- 5 files changed, 58 insertions(+), 57 deletions(-) diff --git a/analytic/consumer/consumer.go b/analytic/consumer/consumer.go index 1f6be5d4..673f72a5 100644 --- a/analytic/consumer/consumer.go +++ b/analytic/consumer/consumer.go @@ -5,7 +5,7 @@ import ( "github.com/wormhole-foundation/wormhole-explorer/analytic/metric" "github.com/wormhole-foundation/wormhole-explorer/analytic/queue" - "github.com/wormhole-foundation/wormhole/sdk/vaa" + sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" "go.uber.org/zap" ) @@ -36,7 +36,7 @@ func (c *Consumer) Start(ctx context.Context) { } // unmarshal vaa. - vaa, err := vaa.Unmarshal(event.Vaa) + vaa, err := sdk.Unmarshal(event.Vaa) if err != nil { c.logger.Error("Invalid vaa", zap.String("id", event.ID), zap.Error(err)) msg.Failed() diff --git a/analytic/metric/metric.go b/analytic/metric/metric.go index 7db44b94..4e1c46e1 100644 --- a/analytic/metric/metric.go +++ b/analytic/metric/metric.go @@ -128,19 +128,12 @@ func (m *Metric) vaaCountAllMessagesMeasurement(ctx context.Context, vaa *sdk.VA const measurement = "vaa_count_all_messages" - // By the way InfluxDB works, two points with the same timesamp will overwrite each other. - // Most VAA timestamps only have millisecond resolution, so it is possible that two VAAs - // will have the same timestamp. - // - // Hence, we add a deterministic number of nanoseconds to the timestamp to avoid collisions. - pseudorandomOffset := vaa.Sequence % 1000 - // Create a new point point := influxdb2. NewPointWithMeasurement(measurement). AddTag("chain_id", strconv.Itoa(int(vaa.EmitterChain))). AddField("count", 1). - SetTime(vaa.Timestamp.Add(time.Nanosecond * time.Duration(pseudorandomOffset))) + SetTime(generateUniqueTimestamp(vaa)) // Write the point to influx err := m.apiBucket24Hours.WritePoint(ctx, point) @@ -187,6 +180,12 @@ func (m *Metric) volumeMeasurement(ctx context.Context, vaa *sdk.VAA) error { if err != nil { return err } + m.logger.Info("Wrote a data point for the volume metric", + zap.String("vaaId", vaa.MessageID()), + zap.String("measurement", point.Name()), + zap.Any("tags", point.TagList()), + zap.Any("fields", point.FieldList()), + ) return nil } @@ -228,7 +227,7 @@ func MakePointForVaaCount(vaa *sdk.VAA) (*write.Point, error) { NewPointWithMeasurement(measurement). AddTag("chain_id", strconv.Itoa(int(vaa.EmitterChain))). AddField("count", 1). - SetTime(vaa.Timestamp.Add(time.Nanosecond * time.Duration(vaa.Sequence))) + SetTime(generateUniqueTimestamp(vaa)) return point, nil } @@ -265,19 +264,34 @@ func MakePointForVaaVolume(params *MakePointForVaaVolumeParams) (*write.Point, e return nil, nil } + // Create a data point + point := influxdb2.NewPointWithMeasurement(measurement). + // This is always set to the portal token bridge app ID, but we may have other apps in the future + AddTag("app_id", domain.AppIdPortalTokenBridge). + AddTag("emitter_chain", fmt.Sprintf("%d", params.Vaa.EmitterChain)). + // Receiver chain + AddTag("destination_chain", fmt.Sprintf("%d", payload.TargetChain)). + // Original mint address + AddTag("token_address", payload.OriginAddress.String()). + // Original mint chain + AddTag("token_chain", fmt.Sprintf("%d", payload.OriginChain)). + SetTime(params.Vaa.Timestamp) + // Get the token metadata // // This is complementary data about the token that is not present in the VAA itself. tokenMeta, ok := domain.GetTokenByAddress(payload.OriginChain, payload.OriginAddress.String()) if !ok { - if params.Logger != nil { - params.Logger.Debug("found no token metadata for VAA", - zap.String("vaaId", params.Vaa.MessageID()), - zap.String("tokenAddress", payload.OriginAddress.String()), - zap.Uint16("tokenChain", uint16(payload.OriginChain)), - ) - } - return nil, nil + // We don't have metadata for this token, so we can't compute the volume-related fields + // (i.e.: amount, notional, volume, symbol, etc.) + // + // InfluxDB will reject data points that don't have any fields, so we need to + // add a dummy field. + // + // Moreover, many flux queries depend on the existence of the `volume` field, + // and would break if we had measurements without it. + point.AddField("volume", uint64(0)) + return point, nil } // Normalize the amount to 8 decimals @@ -317,37 +331,38 @@ func MakePointForVaaVolume(params *MakePointForVaaVolumeParams) (*write.Point, e volume.Mul(amount, notionalBigInt) volume.Div(&volume, big.NewInt(1e8)) - if params.Logger != nil { - params.Logger.Info("Generated data point for volume metric", - zap.String("vaaId", params.Vaa.MessageID()), - zap.String("amount", amount.String()), - zap.String("notional", notionalBigInt.String()), - zap.String("volume", volume.String()), - zap.String("underlyingSymbol", tokenMeta.UnderlyingSymbol.String()), - ) - } - - // Create a data point with volume-related fields + // Add volume-related fields to the data point. // // We're converting big integers to int64 because influxdb doesn't support bigint/numeric types. - point := influxdb2.NewPointWithMeasurement(measurement). - // This is always set to the portal token bridge app ID, but we may have other apps in the future - AddTag("app_id", domain.AppIdPortalTokenBridge). - AddTag("emitter_chain", fmt.Sprintf("%d", params.Vaa.EmitterChain)). - // Receiver chain - AddTag("destination_chain", fmt.Sprintf("%d", payload.TargetChain)). - // Original mint address - AddTag("token_address", payload.OriginAddress.String()). - // Original mint chain - AddTag("token_chain", fmt.Sprintf("%d", payload.OriginChain)). + point. + AddField("symbol", tokenMeta.UnderlyingSymbol.String()). // Amount of tokens transferred, integer, 8 decimals of precision AddField("amount", amount.Uint64()). // Token price at the time the VAA was emitted, integer, 8 decimals of precision AddField("notional", notionalBigInt.Uint64()). // Volume in USD, integer, 8 decimals of precision AddField("volume", volume.Uint64()). - AddField("symbol", tokenMeta.UnderlyingSymbol.String()). - SetTime(params.Vaa.Timestamp) + SetTime(generateUniqueTimestamp(params.Vaa)) return point, nil } + +// generateUniqueTimestamp generates a unique timestamp for each VAA. +// +// Most VAA timestamps only have millisecond resolution, so it is possible that two VAAs +// will have the same timestamp. +// By the way InfluxDB works, two points with the same timesamp will overwrite each other. +// +// Hence, we are forced to generate a deterministic unique timestamp for each VAA. +func generateUniqueTimestamp(vaa *sdk.VAA) time.Time { + + // We're adding 1 a nanosecond offset per sequence. + // Then, we're taking the modulo of 10^6 to ensure that the offset + // will always be lower than one millisecond. + // + // We could also hash the chain, emitter and seq fields, + // but the current approach is good enough for the time being. + offset := time.Duration(vaa.Sequence % 1_000_000) + + return vaa.Timestamp.Add(time.Nanosecond * offset) +} diff --git a/analytic/scripts/asset_volumes_24h.flux b/analytic/scripts/asset_volumes_24h.flux index 280ed875..b70dc92a 100644 --- a/analytic/scripts/asset_volumes_24h.flux +++ b/analytic/scripts/asset_volumes_24h.flux @@ -12,7 +12,6 @@ from(bucket: "wormscan") |> range(start: start, stop: stop) |> filter(fn: (r) => r["_measurement"] == "vaa_volume") |> filter(fn: (r) => r["_field"] == "volume") - |> drop(columns: ["app_id", "destination_address", "destination_chain", "symbol"]) |> group(columns: ["emitter_chain", "token_address", "token_chain"]) |> sum(column: "_value") |> set(key: "_measurement", value: "asset_volumes_24h") diff --git a/analytic/scripts/chain_pair_transfers_24h.flux b/analytic/scripts/chain_pair_transfers_24h.flux index e710ec96..0bb3b1d6 100644 --- a/analytic/scripts/chain_pair_transfers_24h.flux +++ b/analytic/scripts/chain_pair_transfers_24h.flux @@ -12,7 +12,6 @@ from(bucket: "wormscan") |> range(start: start, stop: stop) |> filter(fn: (r) => r["_measurement"] == "vaa_volume") |> filter(fn: (r) => r["_field"] == "volume") - |> drop(columns: ["app_id", "destination_address", "token_address", "token_chain", "_field"]) |> group(columns: ["emitter_chain", "destination_chain"]) |> count(column: "_value") |> set(key: "_measurement", value: "chain_pair_transfers_24h") diff --git a/api/handlers/transactions/repository.go b/api/handlers/transactions/repository.go index b2852168..10b223cf 100644 --- a/api/handlers/transactions/repository.go +++ b/api/handlers/transactions/repository.go @@ -52,17 +52,7 @@ from(bucket: "%s") |> range(start: -24h) |> filter(fn: (r) => r._measurement == "vaa_volume") |> filter(fn:(r) => r._field == "volume") - |> drop(columns: [ - "_measurement", - "app_id", - "destination_address", - "destination_chain", - "emitter_chain", - "token_address", - "token_chain", - "symbol" - ]) - |> group(columns: ["emitter_chain", "token_address", "token_chain"]) + |> group() |> sum(column: "_value") ` @@ -136,8 +126,6 @@ raw = from(bucket: "%s") |> filter(fn: (r) => r["_measurement"] == "vaa_volume") |> filter(fn: (r) => r["_field"] == "volume") |> group(columns: ["emitter_chain", "destination_chain"]) - |> drop(columns: ["app_id", "destination_address", "token_address", "token_chain", "_field"]) - |> group(columns: ["emitter_chain", "destination_chain"]) |> count() // Merge all results, compute the sum, return the top 7 volumes.