Generate volume metrics for unknown tokens (#348)

### Summary
This pull request fixes an issue in which volume metrics were not being generated for unknown tokens (i.e.: tokens that are not present in our database).

Also, the function that generated volume metrics was modified in order to handle timestamp collisions.
This commit is contained in:
agodnic 2023-05-29 10:54:09 -03:00 committed by GitHub
parent cb9532b5c1
commit 4aca29fe7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 58 additions and 57 deletions

View File

@ -5,7 +5,7 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/analytic/metric" "github.com/wormhole-foundation/wormhole-explorer/analytic/metric"
"github.com/wormhole-foundation/wormhole-explorer/analytic/queue" "github.com/wormhole-foundation/wormhole-explorer/analytic/queue"
"github.com/wormhole-foundation/wormhole/sdk/vaa" sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -36,7 +36,7 @@ func (c *Consumer) Start(ctx context.Context) {
} }
// unmarshal vaa. // unmarshal vaa.
vaa, err := vaa.Unmarshal(event.Vaa) vaa, err := sdk.Unmarshal(event.Vaa)
if err != nil { if err != nil {
c.logger.Error("Invalid vaa", zap.String("id", event.ID), zap.Error(err)) c.logger.Error("Invalid vaa", zap.String("id", event.ID), zap.Error(err))
msg.Failed() msg.Failed()

View File

@ -128,19 +128,12 @@ func (m *Metric) vaaCountAllMessagesMeasurement(ctx context.Context, vaa *sdk.VA
const measurement = "vaa_count_all_messages" const measurement = "vaa_count_all_messages"
// By the way InfluxDB works, two points with the same timesamp will overwrite each other.
// Most VAA timestamps only have millisecond resolution, so it is possible that two VAAs
// will have the same timestamp.
//
// Hence, we add a deterministic number of nanoseconds to the timestamp to avoid collisions.
pseudorandomOffset := vaa.Sequence % 1000
// Create a new point // Create a new point
point := influxdb2. point := influxdb2.
NewPointWithMeasurement(measurement). NewPointWithMeasurement(measurement).
AddTag("chain_id", strconv.Itoa(int(vaa.EmitterChain))). AddTag("chain_id", strconv.Itoa(int(vaa.EmitterChain))).
AddField("count", 1). AddField("count", 1).
SetTime(vaa.Timestamp.Add(time.Nanosecond * time.Duration(pseudorandomOffset))) SetTime(generateUniqueTimestamp(vaa))
// Write the point to influx // Write the point to influx
err := m.apiBucket24Hours.WritePoint(ctx, point) err := m.apiBucket24Hours.WritePoint(ctx, point)
@ -187,6 +180,12 @@ func (m *Metric) volumeMeasurement(ctx context.Context, vaa *sdk.VAA) error {
if err != nil { if err != nil {
return err return err
} }
m.logger.Info("Wrote a data point for the volume metric",
zap.String("vaaId", vaa.MessageID()),
zap.String("measurement", point.Name()),
zap.Any("tags", point.TagList()),
zap.Any("fields", point.FieldList()),
)
return nil return nil
} }
@ -228,7 +227,7 @@ func MakePointForVaaCount(vaa *sdk.VAA) (*write.Point, error) {
NewPointWithMeasurement(measurement). NewPointWithMeasurement(measurement).
AddTag("chain_id", strconv.Itoa(int(vaa.EmitterChain))). AddTag("chain_id", strconv.Itoa(int(vaa.EmitterChain))).
AddField("count", 1). AddField("count", 1).
SetTime(vaa.Timestamp.Add(time.Nanosecond * time.Duration(vaa.Sequence))) SetTime(generateUniqueTimestamp(vaa))
return point, nil return point, nil
} }
@ -265,19 +264,34 @@ func MakePointForVaaVolume(params *MakePointForVaaVolumeParams) (*write.Point, e
return nil, nil return nil, nil
} }
// Create a data point
point := influxdb2.NewPointWithMeasurement(measurement).
// This is always set to the portal token bridge app ID, but we may have other apps in the future
AddTag("app_id", domain.AppIdPortalTokenBridge).
AddTag("emitter_chain", fmt.Sprintf("%d", params.Vaa.EmitterChain)).
// Receiver chain
AddTag("destination_chain", fmt.Sprintf("%d", payload.TargetChain)).
// Original mint address
AddTag("token_address", payload.OriginAddress.String()).
// Original mint chain
AddTag("token_chain", fmt.Sprintf("%d", payload.OriginChain)).
SetTime(params.Vaa.Timestamp)
// Get the token metadata // Get the token metadata
// //
// This is complementary data about the token that is not present in the VAA itself. // This is complementary data about the token that is not present in the VAA itself.
tokenMeta, ok := domain.GetTokenByAddress(payload.OriginChain, payload.OriginAddress.String()) tokenMeta, ok := domain.GetTokenByAddress(payload.OriginChain, payload.OriginAddress.String())
if !ok { if !ok {
if params.Logger != nil { // We don't have metadata for this token, so we can't compute the volume-related fields
params.Logger.Debug("found no token metadata for VAA", // (i.e.: amount, notional, volume, symbol, etc.)
zap.String("vaaId", params.Vaa.MessageID()), //
zap.String("tokenAddress", payload.OriginAddress.String()), // InfluxDB will reject data points that don't have any fields, so we need to
zap.Uint16("tokenChain", uint16(payload.OriginChain)), // add a dummy field.
) //
} // Moreover, many flux queries depend on the existence of the `volume` field,
return nil, nil // and would break if we had measurements without it.
point.AddField("volume", uint64(0))
return point, nil
} }
// Normalize the amount to 8 decimals // Normalize the amount to 8 decimals
@ -317,37 +331,38 @@ func MakePointForVaaVolume(params *MakePointForVaaVolumeParams) (*write.Point, e
volume.Mul(amount, notionalBigInt) volume.Mul(amount, notionalBigInt)
volume.Div(&volume, big.NewInt(1e8)) volume.Div(&volume, big.NewInt(1e8))
if params.Logger != nil { // Add volume-related fields to the data point.
params.Logger.Info("Generated data point for volume metric",
zap.String("vaaId", params.Vaa.MessageID()),
zap.String("amount", amount.String()),
zap.String("notional", notionalBigInt.String()),
zap.String("volume", volume.String()),
zap.String("underlyingSymbol", tokenMeta.UnderlyingSymbol.String()),
)
}
// Create a data point with volume-related fields
// //
// We're converting big integers to int64 because influxdb doesn't support bigint/numeric types. // We're converting big integers to int64 because influxdb doesn't support bigint/numeric types.
point := influxdb2.NewPointWithMeasurement(measurement). point.
// This is always set to the portal token bridge app ID, but we may have other apps in the future AddField("symbol", tokenMeta.UnderlyingSymbol.String()).
AddTag("app_id", domain.AppIdPortalTokenBridge).
AddTag("emitter_chain", fmt.Sprintf("%d", params.Vaa.EmitterChain)).
// Receiver chain
AddTag("destination_chain", fmt.Sprintf("%d", payload.TargetChain)).
// Original mint address
AddTag("token_address", payload.OriginAddress.String()).
// Original mint chain
AddTag("token_chain", fmt.Sprintf("%d", payload.OriginChain)).
// Amount of tokens transferred, integer, 8 decimals of precision // Amount of tokens transferred, integer, 8 decimals of precision
AddField("amount", amount.Uint64()). AddField("amount", amount.Uint64()).
// Token price at the time the VAA was emitted, integer, 8 decimals of precision // Token price at the time the VAA was emitted, integer, 8 decimals of precision
AddField("notional", notionalBigInt.Uint64()). AddField("notional", notionalBigInt.Uint64()).
// Volume in USD, integer, 8 decimals of precision // Volume in USD, integer, 8 decimals of precision
AddField("volume", volume.Uint64()). AddField("volume", volume.Uint64()).
AddField("symbol", tokenMeta.UnderlyingSymbol.String()). SetTime(generateUniqueTimestamp(params.Vaa))
SetTime(params.Vaa.Timestamp)
return point, nil return point, nil
} }
// generateUniqueTimestamp generates a unique timestamp for each VAA.
//
// Most VAA timestamps only have millisecond resolution, so it is possible that two VAAs
// will have the same timestamp.
// By the way InfluxDB works, two points with the same timesamp will overwrite each other.
//
// Hence, we are forced to generate a deterministic unique timestamp for each VAA.
func generateUniqueTimestamp(vaa *sdk.VAA) time.Time {
// We're adding 1 a nanosecond offset per sequence.
// Then, we're taking the modulo of 10^6 to ensure that the offset
// will always be lower than one millisecond.
//
// We could also hash the chain, emitter and seq fields,
// but the current approach is good enough for the time being.
offset := time.Duration(vaa.Sequence % 1_000_000)
return vaa.Timestamp.Add(time.Nanosecond * offset)
}

View File

@ -12,7 +12,6 @@ from(bucket: "wormscan")
|> range(start: start, stop: stop) |> range(start: start, stop: stop)
|> filter(fn: (r) => r["_measurement"] == "vaa_volume") |> filter(fn: (r) => r["_measurement"] == "vaa_volume")
|> filter(fn: (r) => r["_field"] == "volume") |> filter(fn: (r) => r["_field"] == "volume")
|> drop(columns: ["app_id", "destination_address", "destination_chain", "symbol"])
|> group(columns: ["emitter_chain", "token_address", "token_chain"]) |> group(columns: ["emitter_chain", "token_address", "token_chain"])
|> sum(column: "_value") |> sum(column: "_value")
|> set(key: "_measurement", value: "asset_volumes_24h") |> set(key: "_measurement", value: "asset_volumes_24h")

View File

@ -12,7 +12,6 @@ from(bucket: "wormscan")
|> range(start: start, stop: stop) |> range(start: start, stop: stop)
|> filter(fn: (r) => r["_measurement"] == "vaa_volume") |> filter(fn: (r) => r["_measurement"] == "vaa_volume")
|> filter(fn: (r) => r["_field"] == "volume") |> filter(fn: (r) => r["_field"] == "volume")
|> drop(columns: ["app_id", "destination_address", "token_address", "token_chain", "_field"])
|> group(columns: ["emitter_chain", "destination_chain"]) |> group(columns: ["emitter_chain", "destination_chain"])
|> count(column: "_value") |> count(column: "_value")
|> set(key: "_measurement", value: "chain_pair_transfers_24h") |> set(key: "_measurement", value: "chain_pair_transfers_24h")

View File

@ -52,17 +52,7 @@ from(bucket: "%s")
|> range(start: -24h) |> range(start: -24h)
|> filter(fn: (r) => r._measurement == "vaa_volume") |> filter(fn: (r) => r._measurement == "vaa_volume")
|> filter(fn:(r) => r._field == "volume") |> filter(fn:(r) => r._field == "volume")
|> drop(columns: [ |> group()
"_measurement",
"app_id",
"destination_address",
"destination_chain",
"emitter_chain",
"token_address",
"token_chain",
"symbol"
])
|> group(columns: ["emitter_chain", "token_address", "token_chain"])
|> sum(column: "_value") |> sum(column: "_value")
` `
@ -136,8 +126,6 @@ raw = from(bucket: "%s")
|> filter(fn: (r) => r["_measurement"] == "vaa_volume") |> filter(fn: (r) => r["_measurement"] == "vaa_volume")
|> filter(fn: (r) => r["_field"] == "volume") |> filter(fn: (r) => r["_field"] == "volume")
|> group(columns: ["emitter_chain", "destination_chain"]) |> group(columns: ["emitter_chain", "destination_chain"])
|> drop(columns: ["app_id", "destination_address", "token_address", "token_chain", "_field"])
|> group(columns: ["emitter_chain", "destination_chain"])
|> count() |> count()
// Merge all results, compute the sum, return the top 7 volumes. // Merge all results, compute the sum, return the top 7 volumes.