Fix vaa_volume measurement in analytics and api queries (#719)

Co-authored-by: walker-16 <agpazos85@gmail.com>
This commit is contained in:
ftocal 2023-10-02 11:21:13 -03:00 committed by GitHub
parent 504c6a2bf4
commit 6be2607c65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 97 additions and 96 deletions

View File

@ -21,9 +21,9 @@ import (
)
const (
vaaCountMeasurement = "vaa_count"
vaaVolumeMeasurement = "vaa_volume"
vaaAllMessagesMeasurement = "vaa_count_all_messages"
VaaCountMeasurement = "vaa_count"
VaaVolumeMeasurement = "vaa_volume_v2"
VaaAllMessagesMeasurement = "vaa_count_all_messages"
)
// Metric definition.
@ -155,10 +155,10 @@ func (m *Metric) vaaCountMeasurement(ctx context.Context, vaa *sdk.VAA) error {
zap.Uint16("chain_id", uint16(vaa.EmitterChain)),
zap.Error(err),
)
m.metrics.IncFailedMeasurement(vaaCountMeasurement)
m.metrics.IncFailedMeasurement(VaaCountMeasurement)
return err
}
m.metrics.IncSuccessfulMeasurement(vaaCountMeasurement)
m.metrics.IncSuccessfulMeasurement(VaaCountMeasurement)
return nil
}
@ -179,7 +179,7 @@ func (m *Metric) vaaCountAllMessagesMeasurement(ctx context.Context, vaa *sdk.VA
// Create a new point
point := influxdb2.
NewPointWithMeasurement(vaaAllMessagesMeasurement).
NewPointWithMeasurement(VaaAllMessagesMeasurement).
AddTag("chain_id", strconv.Itoa(int(vaa.EmitterChain))).
AddField("count", 1).
SetTime(generateUniqueTimestamp(vaa))
@ -188,19 +188,19 @@ func (m *Metric) vaaCountAllMessagesMeasurement(ctx context.Context, vaa *sdk.VA
err := m.apiBucket24Hours.WritePoint(ctx, point)
if err != nil {
m.logger.Error("failed to write metric",
zap.String("measurement", vaaAllMessagesMeasurement),
zap.String("measurement", VaaAllMessagesMeasurement),
zap.Uint16("chain_id", uint16(vaa.EmitterChain)),
zap.Error(err),
)
m.metrics.IncFailedMeasurement(vaaAllMessagesMeasurement)
m.metrics.IncFailedMeasurement(VaaAllMessagesMeasurement)
return err
}
m.metrics.IncSuccessfulMeasurement(vaaAllMessagesMeasurement)
m.metrics.IncSuccessfulMeasurement(VaaAllMessagesMeasurement)
return nil
}
// volumeMeasurement creates a new point for the `vaa_volume` measurement.
// volumeMeasurement creates a new point for the `vaa_volume_v2` measurement.
func (m *Metric) volumeMeasurement(ctx context.Context, vaa *sdk.VAA, token *token.TransferredToken) error {
// Generate a data point for the volume metric
@ -231,7 +231,7 @@ func (m *Metric) volumeMeasurement(ctx context.Context, vaa *sdk.VAA, token *tok
// Write the point to influx
err = m.apiBucketInfinite.WritePoint(ctx, point)
if err != nil {
m.metrics.IncFailedMeasurement(vaaVolumeMeasurement)
m.metrics.IncFailedMeasurement(VaaVolumeMeasurement)
return err
}
m.logger.Info("Wrote a data point for the volume metric",
@ -240,7 +240,7 @@ func (m *Metric) volumeMeasurement(ctx context.Context, vaa *sdk.VAA, token *tok
zap.Any("tags", point.TagList()),
zap.Any("fields", point.FieldList()),
)
m.metrics.IncSuccessfulMeasurement(vaaVolumeMeasurement)
m.metrics.IncSuccessfulMeasurement(VaaVolumeMeasurement)
return nil
}
@ -258,7 +258,7 @@ func MakePointForVaaCount(vaa *sdk.VAA) (*write.Point, error) {
// Create a new point
point := influxdb2.
NewPointWithMeasurement(vaaCountMeasurement).
NewPointWithMeasurement(VaaCountMeasurement).
AddTag("chain_id", strconv.Itoa(int(vaa.EmitterChain))).
AddField("count", 1).
SetTime(generateUniqueTimestamp(vaa))
@ -318,7 +318,7 @@ func MakePointForVaaVolume(params *MakePointForVaaVolumeParams) (*write.Point, e
}
// Create a data point
point := influxdb2.NewPointWithMeasurement(vaaVolumeMeasurement).
point := influxdb2.NewPointWithMeasurement(VaaVolumeMeasurement).
// This is always set to the portal token bridge app ID, but we may have other apps in the future
AddTag("app_id", params.TransferredToken.AppId).
AddTag("emitter_chain", fmt.Sprintf("%d", params.Vaa.EmitterChain)).
@ -328,6 +328,8 @@ func MakePointForVaaVolume(params *MakePointForVaaVolumeParams) (*write.Point, e
AddTag("token_address", params.TransferredToken.TokenAddress.String()).
// Original mint chain
AddTag("token_chain", fmt.Sprintf("%d", params.TransferredToken.TokenChain)).
// Measurement version
AddTag("version", "v2").
SetTime(params.Vaa.Timestamp)
// Get the token metadata

View File

@ -5,16 +5,19 @@ option task = {
every: 24h,
}
sourceBucket = "wormscan"
destinationBucket = "wormscan-30days"
start = date.truncate(t: -24h, unit: 24h)
stop = date.truncate(t: now(), unit: 24h)
from(bucket: "wormscan")
from(bucket: sourceBucket)
|> range(start: start, stop: stop)
|> filter(fn: (r) => r["_measurement"] == "vaa_volume")
|> filter(fn: (r) => r["_measurement"] == "vaa_volume_v2")
|> filter(fn: (r) => r["_field"] == "volume")
|> group(columns: ["emitter_chain", "token_address", "token_chain"])
|> sum(column: "_value")
|> set(key: "_measurement", value: "asset_volumes_24h")
|> set(key: "_measurement", value: "asset_volumes_24h_v2")
|> set(key: "_field", value: "volume")
|> map(fn: (r) => ({r with _time: start}))
|> to(bucket: "wormscan-30days")
|> to(bucket: destinationBucket)

View File

@ -12,20 +12,20 @@ start = date.truncate(t: -15d, unit: 24h)
from(bucket: sourceBucket)
|> range(start: start)
|> filter(fn: (r) => r._measurement == "vaa_volume" and r._field == "volume")
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r._field == "volume")
|> group(columns: ["emitter_chain", "destination_chain", "app_id"])
|> count(column: "_value")
|> map(fn: (r) => ({r with _time: execution}))
|> set(key: "_measurement", value: "chain_activity_15_days_3h")
|> set(key: "_measurement", value: "chain_activity_15_days_3h_v2")
|> set(key: "_field", value: "count")
|> to(bucket: destinationBucket)
from(bucket: sourceBucket)
|> range(start: start)
|> filter(fn: (r) => r._measurement == "vaa_volume" and r._field == "volume")
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r._field == "volume")
|> group(columns: ["emitter_chain", "destination_chain", "app_id"])
|> sum(column: "_value")
|> map(fn: (r) => ({r with _time: execution}))
|> set(key: "_measurement", value: "chain_activity_15_days_3h")
|> set(key: "_measurement", value: "chain_activity_15_days_3h_v2")
|> set(key: "_field", value: "notional")
|> to(bucket: destinationBucket)

View File

@ -12,20 +12,20 @@ start = date.truncate(t: -1y, unit: 24h)
from(bucket: sourceBucket)
|> range(start: start)
|> filter(fn: (r) => r._measurement == "vaa_volume" and r._field == "volume")
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r._field == "volume")
|> group(columns: ["emitter_chain", "destination_chain", "app_id"])
|> count(column: "_value")
|> map(fn: (r) => ({r with _time: execution}))
|> set(key: "_measurement", value: "chain_activity_1_year_3h")
|> set(key: "_measurement", value: "chain_activity_1_year_3h_v2")
|> set(key: "_field", value: "count")
|> to(bucket: destinationBucket)
from(bucket: sourceBucket)
|> range(start: start)
|> filter(fn: (r) => r._measurement == "vaa_volume" and r._field == "volume")
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r._field == "volume")
|> group(columns: ["emitter_chain", "destination_chain", "app_id"])
|> sum(column: "_value")
|> map(fn: (r) => ({r with _time: execution}))
|> set(key: "_measurement", value: "chain_activity_1_year_3h")
|> set(key: "_measurement", value: "chain_activity_1_year_3h_v2")
|> set(key: "_field", value: "notional")
|> to(bucket: destinationBucket)

View File

@ -12,20 +12,20 @@ start = date.truncate(t: -30d, unit: 24h)
from(bucket: sourceBucket)
|> range(start: start)
|> filter(fn: (r) => r._measurement == "vaa_volume" and r._field == "volume")
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r._field == "volume")
|> group(columns: ["emitter_chain", "destination_chain", "app_id"])
|> count(column: "_value")
|> map(fn: (r) => ({r with _time: execution}))
|> set(key: "_measurement", value: "chain_activity_30_days_3h")
|> set(key: "_measurement", value: "chain_activity_30_days_3h_v2")
|> set(key: "_field", value: "count")
|> to(bucket: destinationBucket)
from(bucket: sourceBucket)
|> range(start: start)
|> filter(fn: (r) => r._measurement == "vaa_volume" and r._field == "volume")
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r._field == "volume")
|> group(columns: ["emitter_chain", "destination_chain", "app_id"])
|> sum(column: "_value")
|> map(fn: (r) => ({r with _time: execution}))
|> set(key: "_measurement", value: "chain_activity_30_days_3h")
|> set(key: "_measurement", value: "chain_activity_30_days_3h_v2")
|> set(key: "_field", value: "notional")
|> to(bucket: destinationBucket)

View File

@ -12,20 +12,20 @@ start = date.truncate(t: -7d, unit: 24h)
from(bucket: sourceBucket)
|> range(start: start)
|> filter(fn: (r) => r._measurement == "vaa_volume" and r._field == "volume")
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r._field == "volume")
|> group(columns: ["emitter_chain", "destination_chain", "app_id"])
|> count(column: "_value")
|> map(fn: (r) => ({r with _time: execution}))
|> set(key: "_measurement", value: "chain_activity_7_days_3h")
|> set(key: "_measurement", value: "chain_activity_7_days_3h_v2")
|> set(key: "_field", value: "count")
|> to(bucket: destinationBucket)
from(bucket: sourceBucket)
|> range(start: start)
|> filter(fn: (r) => r._measurement == "vaa_volume" and r._field == "volume")
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r._field == "volume")
|> group(columns: ["emitter_chain", "destination_chain", "app_id"])
|> sum(column: "_value")
|> map(fn: (r) => ({r with _time: execution}))
|> set(key: "_measurement", value: "chain_activity_7_days_3h")
|> set(key: "_measurement", value: "chain_activity_7_days_3h_v2")
|> set(key: "_field", value: "notional")
|> to(bucket: destinationBucket)

View File

@ -12,20 +12,20 @@ start = date.truncate(t: -90d, unit: 24h)
from(bucket: sourceBucket)
|> range(start: start)
|> filter(fn: (r) => r._measurement == "vaa_volume" and r._field == "volume")
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r._field == "volume")
|> group(columns: ["emitter_chain", "destination_chain", "app_id"])
|> count(column: "_value")
|> map(fn: (r) => ({r with _time: execution}))
|> set(key: "_measurement", value: "chain_activity_90_days_3h")
|> set(key: "_measurement", value: "chain_activity_90_days_3h_v2")
|> set(key: "_field", value: "count")
|> to(bucket: destinationBucket)
from(bucket: sourceBucket)
|> range(start: start)
|> filter(fn: (r) => r._measurement == "vaa_volume" and r._field == "volume")
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r._field == "volume")
|> group(columns: ["emitter_chain", "destination_chain", "app_id"])
|> sum(column: "_value")
|> map(fn: (r) => ({r with _time: execution}))
|> set(key: "_measurement", value: "chain_activity_90_days_3h")
|> set(key: "_measurement", value: "chain_activity_90_days_3h_v2")
|> set(key: "_field", value: "notional")
|> to(bucket: destinationBucket)

View File

@ -11,20 +11,20 @@ execution = date.truncate(t: now(), unit: 1h)
from(bucket: sourceBucket)
|> range(start: 1970-01-01T00:00:00Z)
|> filter(fn: (r) => r._measurement == "vaa_volume" and r._field == "volume")
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r._field == "volume")
|> group(columns: ["emitter_chain", "destination_chain", "app_id"])
|> count(column: "_value")
|> map(fn: (r) => ({r with _time: execution}))
|> set(key: "_measurement", value: "chain_activity_all_time_3h")
|> set(key: "_measurement", value: "chain_activity_all_time_3h_v2")
|> set(key: "_field", value: "count")
|> to(bucket: destinationBucket)
from(bucket: sourceBucket)
|> range(start: 1970-01-01T00:00:00Z)
|> filter(fn: (r) => r._measurement == "vaa_volume" and r._field == "volume")
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r._field == "volume")
|> group(columns: ["emitter_chain", "destination_chain", "app_id"])
|> sum(column: "_value")
|> map(fn: (r) => ({r with _time: execution}))
|> set(key: "_measurement", value: "chain_activity_all_time_3h")
|> set(key: "_measurement", value: "chain_activity_all_time_3h_v2")
|> set(key: "_field", value: "notional")
|> to(bucket: destinationBucket)

View File

@ -0,0 +1,30 @@
import "date"
option task = {
name: "total tx for all time every 24-hours",
every: 24h,
}
sourceBucket = "wormscan"
destinationBucket = "wormscan-30days"
stop = date.truncate(t: now(), unit: 24h)
from(bucket: sourceBucket)
|> range(start: 1970-01-01T00:00:00Z, stop: stop)
|> filter(fn: (r) => r["_measurement"] == "vaa_volume_v2")
|> filter(fn: (r) => r["_field"] == "volume")
|> group()
|> count()
|> map(fn: (r) => ({ _time: r._stop, _value: r._value, _measurement: "total_tx_count_v2", _field: "value" }))
|> to(bucket: destinationBucket)
from(bucket: sourceBucket)
|> range(start: 1970-01-01T00:00:00Z, stop: stop)
|> filter(fn: (r) => r["_measurement"] == "vaa_volume_v2")
|> filter(fn: (r) => r["_field"] == "volume")
|> group()
|> sum()
|> map(fn: (r) => ({ _time: r._stop, _value: r._value, _measurement: "total_tx_volume_v2", _field: "value" }))
|> to(bucket: destinationBucket)

View File

@ -1,17 +0,0 @@
import "date"
option task = {
name: "total tx count by portal bridge",
every: 24h,
}
stop = date.truncate(t: now(), unit: 24h)
from(bucket: "wormscan")
|> range(start: 1970-01-01T00:00:00Z, stop: stop)
|> filter(fn: (r) => r["_measurement"] == "vaa_volume")
|> filter(fn: (r) => r["_field"] == "volume")
|> group()
|> count()
|> map(fn: (r) => ({ _time: r._stop, _value: r._value, _measurement: "total_tx_count", _field: "value" }))
|> to(bucket: "wormscan-30days")

View File

@ -1,17 +0,0 @@
import "date"
option task = {
name: "total tx volume by portal bridge",
every: 24h,
}
stop = date.truncate(t: now(), unit: 24h)
from(bucket: "wormscan")
|> range(start: 1970-01-01T00:00:00Z, stop: stop)
|> filter(fn: (r) => r["_measurement"] == "vaa_volume")
|> filter(fn: (r) => r["_field"] == "volume")
|> group()
|> sum()
|> map(fn: (r) => ({ _time: r._stop, _value: r._value, _measurement: "total_tx_volume", _field: "value" }))
|> to(bucket: "wormscan-30days")

View File

@ -68,13 +68,13 @@ func createRangeQuery(t time.Time, timeSpan string) (string, string) {
const queryTemplateTotalTrxCount = `
current = from(bucket: "%s")
|> range(start: %s)
|> filter(fn: (r) => r["_measurement"] == "vaa_volume")
|> filter(fn: (r) => r["_measurement"] == "vaa_volume_v2")
|> filter(fn: (r) => r["_field"] == "volume")
|> group()
|> count()
last = from(bucket: "%s")
|> range(start: -1mo)
|> filter(fn: (r) => r["_measurement"] == "total_tx_count")
|> filter(fn: (r) => r["_measurement"] == "total_tx_count_v2")
|> last()
union(tables: [current, last])
|> group()
@ -89,13 +89,13 @@ func buildTotalTrxCountQuery(bucketForever, bucket30Days string, t time.Time) st
const queryTemplateTotalTrxVolume = `
current = from(bucket: "%s")
|> range(start: %s)
|> filter(fn: (r) => r["_measurement"] == "vaa_volume")
|> filter(fn: (r) => r["_measurement"] == "vaa_volume_v2")
|> filter(fn: (r) => r["_field"] == "volume")
|> group()
|> sum()
last = from(bucket: "%s")
|> range(start: -1mo)
|> filter(fn: (r) => r["_measurement"] == "total_tx_volume")
|> filter(fn: (r) => r["_measurement"] == "total_tx_volume_v2")
|> last()
union(tables: [current, last])
|> group()

View File

@ -91,13 +91,13 @@ func TestQueries_buildTotalTrxCountQuery(t *testing.T) {
expected := `
current = from(bucket: "bucket-forever")
|> range(start: 2023-05-12T00:00:00Z)
|> filter(fn: (r) => r["_measurement"] == "vaa_volume")
|> filter(fn: (r) => r["_measurement"] == "vaa_volume_v2")
|> filter(fn: (r) => r["_field"] == "volume")
|> group()
|> count()
last = from(bucket: "bucket-30days")
|> range(start: -1mo)
|> filter(fn: (r) => r["_measurement"] == "total_tx_count")
|> filter(fn: (r) => r["_measurement"] == "total_tx_count_v2")
|> last()
union(tables: [current, last])
|> group()
@ -114,13 +114,13 @@ func TestQueries_buildTotalTrxVolumeQuery(t *testing.T) {
expected := `
current = from(bucket: "bucket-forever")
|> range(start: 2023-05-10T00:00:00Z)
|> filter(fn: (r) => r["_measurement"] == "vaa_volume")
|> filter(fn: (r) => r["_measurement"] == "vaa_volume_v2")
|> filter(fn: (r) => r["_field"] == "volume")
|> group()
|> sum()
last = from(bucket: "bucket-30days")
|> range(start: -1mo)
|> filter(fn: (r) => r["_measurement"] == "total_tx_volume")
|> filter(fn: (r) => r["_measurement"] == "total_tx_volume_v2")
|> last()
union(tables: [current, last])
|> group()

View File

@ -53,7 +53,7 @@ from(bucket: "%s")
const queryTemplateVolume24h = `
from(bucket: "%s")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "vaa_volume")
|> filter(fn: (r) => r._measurement == "vaa_volume_v2")
|> filter(fn:(r) => r._field == "volume")
|> group()
|> sum(column: "_value")
@ -91,7 +91,7 @@ import "date"
// Get historic volumes from the summarized metric.
summarized = from(bucket: "%s")
|> range(start: -%s)
|> filter(fn: (r) => r["_measurement"] == "asset_volumes_24h")
|> filter(fn: (r) => r["_measurement"] == "asset_volumes_24h_v2")
|> group(columns: ["emitter_chain", "token_address", "token_chain"])
// Get the current day's volume from the unsummarized metric.
@ -99,7 +99,7 @@ summarized = from(bucket: "%s")
startOfDay = date.truncate(t: now(), unit: 1d)
raw = from(bucket: "%s")
|> range(start: startOfDay)
|> filter(fn: (r) => r["_measurement"] == "vaa_volume")
|> filter(fn: (r) => r["_measurement"] == "vaa_volume_v2")
|> filter(fn: (r) => r["_field"] == "volume")
|> group(columns: ["emitter_chain", "token_address", "token_chain"])
@ -238,11 +238,11 @@ func (r *Repository) GetTopChainPairs(ctx context.Context, timeSpan *TopStatisti
var measurement string
switch *timeSpan {
case TimeSpan7Days:
measurement = "chain_activity_7_days_3h"
measurement = "chain_activity_7_days_3h_v2"
case TimeSpan15Days:
measurement = "chain_activity_15_days_3h"
measurement = "chain_activity_15_days_3h_v2"
case TimeSpan30Days:
measurement = "chain_activity_30_days_3h"
measurement = "chain_activity_30_days_3h_v2"
}
// Submit the query to InfluxDB
@ -378,17 +378,17 @@ func (r *Repository) buildChainActivityQuery(q *ChainActivityQuery) string {
var measurement string
switch q.TimeSpan {
case ChainActivityTs7Days:
measurement = "chain_activity_7_days_3h"
measurement = "chain_activity_7_days_3h_v2"
case ChainActivityTs30Days:
measurement = "chain_activity_30_days_3h"
measurement = "chain_activity_30_days_3h_v2"
case ChainActivityTs90Days:
measurement = "chain_activity_90_days_3h"
measurement = "chain_activity_90_days_3h_v2"
case ChainActivityTs1Year:
measurement = "chain_activity_1_year_3h"
measurement = "chain_activity_1_year_3h_v2"
case ChainActivityTsAllTime:
measurement = "chain_activity_all_time_3h"
measurement = "chain_activity_all_time_3h_v2"
default:
measurement = "chain_activity_7_days_3h"
measurement = "chain_activity_7_days_3h_v2"
}
//today without hours
start := time.Now().Truncate(24 * time.Hour).UTC().Format(time.RFC3339)