Increment observation tx hash cache expiration (#1184)

* Increment observation tx hash cache expiration

* Add cache dedup for pyth vaas

* Increment observation dedup cache

* wip
This commit is contained in:
ftocal 2024-03-12 12:23:29 -03:00 committed by GitHub
parent 83a64b758a
commit 67d0e970f0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 74 additions and 40 deletions

View File

@ -24,12 +24,16 @@ HEARTBEATS_CHANNEL_SIZE=50
GOVERNOR_CONFIG_CHANNEL_SIZE=50
GOVERNOR_STATUS_CHANNEL_SIZE=50
REDIS_VAA_CHANNEL=gossip-signed-vaas
OBSERVATIONS_DEDUP_CACHE_EXPIRATION_SECONDS=30
OBSERVATIONS_DEDUP_CACHE_EXPIRATION_SECONDS=300
OBSERVATIONS_DEDUP_CACHE_NUM_KEYS=100000
OBSERVATIONS_DEDUP_CACHE_MAX_COSTS_MB=20
OBSERVATIONS_TX_HASH_CACHE_EXPIRATION_SECONDS=5
OBSERVATIONS_TX_HASH_CACHE_NUM_KEYS=10000000
OBSERVATIONS_DEDUP_CACHE_MAX_COSTS_MB=30
OBSERVATIONS_TX_HASH_CACHE_EXPIRATION_SECONDS=300
OBSERVATIONS_TX_HASH_CACHE_NUM_KEYS=100000
OBSERVATIONS_TX_HASH_CACHE_MAX_COSTS_MB=50
VAAS_DEDUP_CACHE_EXPIRATION_SECONDS=30
VAAS_DEDUP_CACHE_NUM_KEYS=100000
VAAS_DEDUP_CACHE_MAX_COSTS_MB=20
VAAS_PYTH_DEDUP_CACHE_EXPIRATION_SECONDS=30
VAAS_PYTH_DEDUP_CACHE_NUM_KEYS=100000
VAAS_PYTH_DEDUP_CACHE_MAX_COSTS_MB=20

View File

@ -27,9 +27,12 @@ REDIS_VAA_CHANNEL=gossip-signed-vaas
OBSERVATIONS_DEDUP_CACHE_EXPIRATION_SECONDS=30
OBSERVATIONS_DEDUP_CACHE_NUM_KEYS=1000
OBSERVATIONS_DEDUP_CACHE_MAX_COSTS_MB=10
OBSERVATIONS_TX_HASH_CACHE_EXPIRATION_SECONDS=5
OBSERVATIONS_TX_HASH_CACHE_EXPIRATION_SECONDS=300
OBSERVATIONS_TX_HASH_CACHE_NUM_KEYS=100000
OBSERVATIONS_TX_HASH_CACHE_MAX_COSTS_MB=20
VAAS_DEDUP_CACHE_EXPIRATION_SECONDS=30
VAAS_DEDUP_CACHE_NUM_KEYS=1000
VAAS_DEDUP_CACHE_MAX_COSTS_MB=10
VAAS_PYTH_DEDUP_CACHE_EXPIRATION_SECONDS=30
VAAS_PYTH_DEDUP_CACHE_NUM_KEYS=1000
VAAS_PYTH_DEDUP_CACHE_MAX_COSTS_MB=10

View File

@ -24,12 +24,15 @@ HEARTBEATS_CHANNEL_SIZE=50
GOVERNOR_CONFIG_CHANNEL_SIZE=50
GOVERNOR_STATUS_CHANNEL_SIZE=50
REDIS_VAA_CHANNEL=gossip-signed-vaas
OBSERVATIONS_DEDUP_CACHE_EXPIRATION_SECONDS=30
OBSERVATIONS_DEDUP_CACHE_EXPIRATION_SECONDS=300
OBSERVATIONS_DEDUP_CACHE_NUM_KEYS=100000
OBSERVATIONS_DEDUP_CACHE_MAX_COSTS_MB=20
OBSERVATIONS_TX_HASH_CACHE_EXPIRATION_SECONDS=5
OBSERVATIONS_TX_HASH_CACHE_NUM_KEYS=10000000
OBSERVATIONS_DEDUP_CACHE_MAX_COSTS_MB=30
OBSERVATIONS_TX_HASH_CACHE_EXPIRATION_SECONDS=300
OBSERVATIONS_TX_HASH_CACHE_NUM_KEYS=100000
OBSERVATIONS_TX_HASH_CACHE_MAX_COSTS_MB=50
VAAS_DEDUP_CACHE_EXPIRATION_SECONDS=30
VAAS_DEDUP_CACHE_NUM_KEYS=100000
VAAS_DEDUP_CACHE_MAX_COSTS_MB=20
VAAS_PYTH_DEDUP_CACHE_EXPIRATION_SECONDS=30
VAAS_PYTH_DEDUP_CACHE_NUM_KEYS=100000
VAAS_PYTH_DEDUP_CACHE_MAX_COSTS_MB=20

View File

@ -27,9 +27,12 @@ REDIS_VAA_CHANNEL=gossip-signed-vaas
OBSERVATIONS_DEDUP_CACHE_EXPIRATION_SECONDS=30
OBSERVATIONS_DEDUP_CACHE_NUM_KEYS=1000
OBSERVATIONS_DEDUP_CACHE_MAX_COSTS_MB=10
OBSERVATIONS_TX_HASH_CACHE_EXPIRATION_SECONDS=5
OBSERVATIONS_TX_HASH_CACHE_EXPIRATION_SECONDS=300
OBSERVATIONS_TX_HASH_CACHE_NUM_KEYS=100000
OBSERVATIONS_TX_HASH_CACHE_MAX_COSTS_MB=20
VAAS_DEDUP_CACHE_EXPIRATION_SECONDS=30
VAAS_DEDUP_CACHE_NUM_KEYS=1000
VAAS_DEDUP_CACHE_MAX_COSTS_MB=10
VAAS_PYTH_DEDUP_CACHE_EXPIRATION_SECONDS=30
VAAS_PYTH_DEDUP_CACHE_NUM_KEYS=1000
VAAS_PYTH_DEDUP_CACHE_MAX_COSTS_MB=10

View File

@ -135,6 +135,12 @@ spec:
value: "{{ .VAAS_DEDUP_CACHE_NUM_KEYS }}"
- name: VAAS_DEDUP_CACHE_MAX_COSTS_MB
value: "{{ .VAAS_DEDUP_CACHE_MAX_COSTS_MB }}"
- name: VAAS_PYTH_DEDUP_CACHE_EXPIRATION_SECONDS
value: "{{ .VAAS_PYTH_DEDUP_CACHE_EXPIRATION_SECONDS }}"
- name: VAAS_PYTH_DEDUP_CACHE_NUM_KEYS
value: "{{ .VAAS_PYTH_DEDUP_CACHE_NUM_KEYS }}"
- name: VAAS_PYTH_DEDUP_CACHE_MAX_COSTS_MB
value: "{{ .VAAS_PYTH_DEDUP_CACHE_MAX_COSTS_MB }}"
resources:
limits:
memory: {{ .RESOURCES_LIMITS_MEMORY }}

View File

@ -48,7 +48,7 @@ func NewTxHashStore(ctx context.Context, config *config.Configuration, metrics m
if err != nil {
return nil, err
}
cacheTxHash, err := NewCache[txhash.TxHash]("observations-tx-hash", config.ObservationsTxHash.NumKeys, config.ObservationsTxHash.MaxCostsInMB)
cacheTxHash, err := NewCache[string]("observations-tx-hash", config.ObservationsTxHash.NumKeys, config.ObservationsTxHash.MaxCostsInMB)
if err != nil {
return nil, err
}

View File

@ -61,6 +61,7 @@ type Configuration struct {
ObservationsDedup Cache `env:", prefix=OBSERVATIONS_DEDUP_,required"`
ObservationsTxHash Cache `env:", prefix=OBSERVATIONS_TX_HASH_,required"`
VaasDedup Cache `env:", prefix=VAAS_DEDUP_,required"`
VaasPythDedup Cache `env:", prefix=VAAS_PYTH_DEDUP_,required"`
}
type RedisConfiguration struct {

View File

@ -101,7 +101,12 @@ func main() {
}
repository := storage.NewRepository(alertClient, metrics, db.Database, producerFunc, txHashStore, logger)
vaaDedup, err := builder.NewDeduplicator("vaas-dedup", cfg.VaasDedup, logger)
vaaNonPythDedup, err := builder.NewDeduplicator("vaas-dedup", cfg.VaasDedup, logger)
if err != nil {
logger.Fatal("could not create vaa deduplicator", zap.Error(err))
}
vaaPythDedup, err := builder.NewDeduplicator("vaas-pyth-dedup", cfg.VaasPythDedup, logger)
if err != nil {
logger.Fatal("could not create vaa deduplicator", zap.Error(err))
}
@ -141,7 +146,7 @@ func main() {
// When recive a message, the message filter by deduplicator
// if VAA is from pyhnet should be saved directly to repository
// if VAA is from non pyhnet should be publish with nonPythVaaPublish
vaaGossipConsumer := processor.NewVAAGossipConsumer(&guardianSetHistory, vaaDedup, nonPythVaaPublish, repository.UpsertVaa, metrics, repository, logger)
vaaGossipConsumer := processor.NewVAAGossipConsumer(&guardianSetHistory, vaaNonPythDedup, vaaPythDedup, nonPythVaaPublish, repository.UpsertVaa, metrics, repository, logger)
// Creates a instance to consume VAA messages (non pyth) from a queue and store in a storage
vaaQueueConsumer := processor.NewVAAQueueConsumer(vaaQueueConsume, repository, notifierFunc, metrics, logger)
// Creates a wrapper that splits the incoming VAAs into 2 channels (pyth to non pyth) in order

View File

@ -18,7 +18,8 @@ type vaaGossipConsumer struct {
nonPythProcess VAAPushFunc
pythProcess VAAPushFunc
logger *zap.Logger
deduplicator *deduplicator.Deduplicator
nonPythDedup *deduplicator.Deduplicator
pythDedup *deduplicator.Deduplicator
metrics metrics.Metrics
repository *storage.Repository
}
@ -26,7 +27,8 @@ type vaaGossipConsumer struct {
// NewVAAGossipConsumer creates a new processor instances.
func NewVAAGossipConsumer(
guardianSetHistory *guardiansets.GuardianSetHistory,
deduplicator *deduplicator.Deduplicator,
nonPythDedup *deduplicator.Deduplicator,
pythDedup *deduplicator.Deduplicator,
nonPythPublish VAAPushFunc,
pythPublish VAAPushFunc,
metrics metrics.Metrics,
@ -36,7 +38,8 @@ func NewVAAGossipConsumer(
return &vaaGossipConsumer{
guardianSetHistory: guardianSetHistory,
deduplicator: deduplicator,
nonPythDedup: nonPythDedup,
pythDedup: pythDedup,
nonPythProcess: nonPythPublish,
pythProcess: pythPublish,
metrics: metrics,
@ -54,22 +57,28 @@ func (p *vaaGossipConsumer) Push(ctx context.Context, v *vaa.VAA, serializedVaa
}
key := fmt.Sprintf("vaa:%s", v.MessageID())
err := p.deduplicator.Apply(ctx, key, func() error {
p.metrics.IncVaaUnfiltered(v.EmitterChain)
if vaa.ChainIDPythNet == v.EmitterChain {
var err error
if vaa.ChainIDPythNet == v.EmitterChain {
err = p.pythDedup.Apply(ctx, key, func() error {
p.metrics.IncVaaUnfiltered(v.EmitterChain)
return p.pythProcess(ctx, v, serializedVaa)
}
err := p.nonPythProcess(ctx, v, serializedVaa)
if err != nil {
p.logger.Error("Error processing vaa", zap.String("id", v.MessageID()), zap.Error(err))
// This is the fallback to store the vaa in the repository.
err = p.repository.UpsertVaa(ctx, v, serializedVaa)
if err != nil {
p.logger.Error("Error inserting vaa in repository as fallback", zap.String("id", v.MessageID()), zap.Error(err))
})
} else {
err = p.nonPythDedup.Apply(ctx, key, func() error {
p.metrics.IncVaaUnfiltered(v.EmitterChain)
pErr := p.nonPythProcess(ctx, v, serializedVaa)
if pErr != nil {
p.logger.Error("Error processing vaa", zap.String("id", v.MessageID()), zap.Error(err))
// This is the fallback to store the vaa in the repository.
pErr = p.repository.UpsertVaa(ctx, v, serializedVaa)
if pErr != nil {
p.logger.Error("Error inserting vaa in repository as fallback", zap.String("id", v.MessageID()), zap.Error(err))
}
}
}
return err
})
return pErr
})
}
if err != nil {
p.logger.Error("Error consuming from Gossip network",

View File

@ -109,7 +109,7 @@ func (s *Repository) UpsertVaa(ctx context.Context, v *vaa.VAA, serializedVaa []
s.log.Warn("Finding vaaIdTxHash", zap.String("id", id), zap.Error(err))
}
if txHash != nil {
vaaDoc.TxHash = txHash.TxHash
vaaDoc.TxHash = *txHash
}
result, err = s.collections.vaas.UpdateByID(ctx, id, update, opts)
if err != nil {

View File

@ -11,12 +11,12 @@ import (
)
type cacheTxHash struct {
cache cache.CacheInterface[TxHash]
cache cache.CacheInterface[string]
expiration time.Duration
logger *zap.Logger
}
func NewCacheTxHash(cache cache.CacheInterface[TxHash],
func NewCacheTxHash(cache cache.CacheInterface[string],
expiration time.Duration,
logger *zap.Logger) *cacheTxHash {
return &cacheTxHash{
@ -27,7 +27,7 @@ func NewCacheTxHash(cache cache.CacheInterface[TxHash],
}
func (t *cacheTxHash) Set(ctx context.Context, vaaID string, txHash TxHash) error {
if err := t.cache.Set(ctx, vaaID, txHash, store.WithCost(16), store.WithExpiration(t.expiration)); err != nil {
if err := t.cache.Set(ctx, vaaID, txHash.TxHash, store.WithCost(256), store.WithExpiration(t.expiration)); err != nil {
t.logger.Error("Error setting tx hash in cache", zap.Error(err))
return err
}
@ -43,7 +43,7 @@ func (r *cacheTxHash) SetObservation(ctx context.Context, o *gossipv1.SignedObse
return r.Set(ctx, o.MessageId, *txHash)
}
func (r *cacheTxHash) Get(ctx context.Context, vaaID string) (*TxHash, error) {
func (r *cacheTxHash) Get(ctx context.Context, vaaID string) (*string, error) {
txHash, err := r.cache.Get(ctx, vaaID)
if err == nil {
return &txHash, nil

View File

@ -71,7 +71,7 @@ func (t *composite) SetObservation(ctx context.Context, o *gossipv1.SignedObserv
return t.Set(ctx, o.MessageId, vaaTxHash)
}
func (t *composite) Get(ctx context.Context, vaaID string) (*TxHash, error) {
func (t *composite) Get(ctx context.Context, vaaID string) (*string, error) {
log := t.logger.With(zap.String("vaaId", vaaID))
for _, store := range t.hashStores {
txHash, err := store.Get(ctx, vaaID)

View File

@ -39,7 +39,7 @@ func (d *dedupTxHashStore) SetObservation(ctx context.Context, o *gossipv1.Signe
return d.Set(ctx, o.MessageId, *txHash)
}
func (d *dedupTxHashStore) Get(ctx context.Context, vaaID string) (*TxHash, error) {
func (d *dedupTxHashStore) Get(ctx context.Context, vaaID string) (*string, error) {
return d.txHashStore.Get(ctx, vaaID)
}

View File

@ -67,7 +67,7 @@ func (r *mongoTxHash) SetObservation(ctx context.Context, o *gossipv1.SignedObse
return r.Set(ctx, o.MessageId, *txHash)
}
func (m *mongoTxHash) Get(ctx context.Context, vaaID string) (*TxHash, error) {
func (m *mongoTxHash) Get(ctx context.Context, vaaID string) (*string, error) {
var mongoTxHash TxHash
if err := m.vaaIdTxHashCollection.FindOne(ctx, bson.M{"_id": vaaID}).Decode(&mongoTxHash); err != nil {
if err == mongo.ErrNoDocuments {
@ -76,7 +76,7 @@ func (m *mongoTxHash) Get(ctx context.Context, vaaID string) (*TxHash, error) {
m.logger.Error("Finding vaaIdTxHash", zap.String("id", vaaID), zap.Error(err))
return nil, err
}
return &mongoTxHash, nil
return &mongoTxHash.TxHash, nil
}
func (r *mongoTxHash) GetName() string {

View File

@ -23,7 +23,7 @@ type TxHash struct {
}
type TxHashStore interface {
Get(ctx context.Context, vaaID string) (*TxHash, error)
Get(ctx context.Context, vaaID string) (*string, error)
Set(ctx context.Context, vaaID string, txHash TxHash) error
SetObservation(ctx context.Context, o *gossipv1.SignedObservation) error
GetName() string