wormhole/node/pkg/governor/governor_db.go

231 lines
7.2 KiB
Go

// This file contains the code to load transfers and pending messages from the database.
package governor
import (
"sort"
"time"
"github.com/certusone/wormhole/node/pkg/db"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
func (gov *ChainGovernor) loadFromDB() error {
gov.mutex.Lock()
defer gov.mutex.Unlock()
return gov.loadFromDBAlreadyLocked()
}
func (gov *ChainGovernor) loadFromDBAlreadyLocked() error {
xfers, pending, err := gov.db.GetChainGovernorData(gov.logger)
if err != nil {
gov.logger.Error("failed to reload transactions from db", zap.Error(err))
return err
}
now := time.Now()
if len(pending) != 0 {
sort.SliceStable(pending, func(i, j int) bool {
return pending[i].Msg.Timestamp.Before(pending[j].Msg.Timestamp)
})
for _, p := range pending {
gov.reloadPendingTransfer(p)
}
}
if len(xfers) != 0 {
sort.SliceStable(xfers, func(i, j int) bool {
return xfers[i].Timestamp.Before(xfers[j].Timestamp)
})
startTime := now.Add(-time.Minute * time.Duration(gov.dayLengthInMinutes))
for _, xfer := range xfers {
if startTime.Before(xfer.Timestamp) {
gov.reloadTransfer(xfer)
} else {
if err := gov.db.DeleteTransfer(xfer); err != nil {
return err
}
}
}
}
return nil
}
func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer) {
msg := &pending.Msg
ce, exists := gov.chains[msg.EmitterChain]
if !exists {
gov.logger.Error("reloaded pending transfer for unsupported chain, dropping it",
zap.String("MsgID", msg.MessageIDString()),
zap.Stringer("TxHash", msg.TxHash),
zap.Stringer("Timestamp", msg.Timestamp),
zap.Uint32("Nonce", msg.Nonce),
zap.Uint64("Sequence", msg.Sequence),
zap.Uint8("ConsistencyLevel", msg.ConsistencyLevel),
zap.Stringer("EmitterChain", msg.EmitterChain),
zap.Stringer("EmitterAddress", msg.EmitterAddress),
)
return
}
if msg.EmitterAddress != ce.emitterAddr {
gov.logger.Error("reloaded pending transfer for unsupported emitter address, dropping it",
zap.String("MsgID", msg.MessageIDString()),
zap.Stringer("TxHash", msg.TxHash),
zap.Stringer("Timestamp", msg.Timestamp),
zap.Uint32("Nonce", msg.Nonce),
zap.Uint64("Sequence", msg.Sequence),
zap.Uint8("ConsistencyLevel", msg.ConsistencyLevel),
zap.Stringer("EmitterChain", msg.EmitterChain),
zap.Stringer("EmitterAddress", msg.EmitterAddress),
)
return
}
payload, err := vaa.DecodeTransferPayloadHdr(msg.Payload)
if err != nil {
gov.logger.Error("failed to parse payload for reloaded pending transfer, dropping it",
zap.String("MsgID", msg.MessageIDString()),
zap.Stringer("TxHash", msg.TxHash),
zap.Stringer("Timestamp", msg.Timestamp),
zap.Uint32("Nonce", msg.Nonce),
zap.Uint64("Sequence", msg.Sequence),
zap.Uint8("ConsistencyLevel", msg.ConsistencyLevel),
zap.Stringer("EmitterChain", msg.EmitterChain),
zap.Stringer("EmitterAddress", msg.EmitterAddress),
zap.Error(err),
)
return
}
tk := tokenKey{chain: payload.OriginChain, addr: payload.OriginAddress}
token, exists := gov.tokens[tk]
if !exists {
gov.logger.Error("reloaded pending transfer for unsupported token, dropping it",
zap.String("MsgID", msg.MessageIDString()),
zap.Stringer("TxHash", msg.TxHash),
zap.Stringer("Timestamp", msg.Timestamp),
zap.Uint32("Nonce", msg.Nonce),
zap.Uint64("Sequence", msg.Sequence),
zap.Uint8("ConsistencyLevel", msg.ConsistencyLevel),
zap.Stringer("EmitterChain", msg.EmitterChain),
zap.Stringer("EmitterAddress", msg.EmitterAddress),
zap.Stringer("tokenChain", payload.OriginChain),
zap.Stringer("tokenAddress", payload.OriginAddress),
)
return
}
hash := gov.HashFromMsg(msg)
if _, alreadyExists := gov.msgsSeen[hash]; alreadyExists {
gov.logger.Error("not reloading pending transfer because it is a duplicate",
zap.String("MsgID", msg.MessageIDString()),
zap.Stringer("TxHash", msg.TxHash),
zap.Stringer("Timestamp", msg.Timestamp),
zap.Uint32("Nonce", msg.Nonce),
zap.Uint64("Sequence", msg.Sequence),
zap.Uint8("ConsistencyLevel", msg.ConsistencyLevel),
zap.Stringer("EmitterChain", msg.EmitterChain),
zap.Stringer("EmitterAddress", msg.EmitterAddress),
zap.Stringer("Amount", payload.Amount),
zap.String("Hash", hash),
)
return
}
gov.logger.Info("reloaded pending transfer",
zap.String("MsgID", msg.MessageIDString()),
zap.Stringer("TxHash", msg.TxHash),
zap.Stringer("Timestamp", msg.Timestamp),
zap.Uint32("Nonce", msg.Nonce),
zap.Uint64("Sequence", msg.Sequence),
zap.Uint8("ConsistencyLevel", msg.ConsistencyLevel),
zap.Stringer("EmitterChain", msg.EmitterChain),
zap.Stringer("EmitterAddress", msg.EmitterAddress),
zap.Stringer("Amount", payload.Amount),
zap.String("Hash", hash),
)
ce.pending = append(ce.pending, &pendingEntry{token: token, amount: payload.Amount, hash: hash, dbData: *pending})
gov.msgsSeen[hash] = transferEnqueued
}
func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer) {
ce, exists := gov.chains[xfer.EmitterChain]
if !exists {
gov.logger.Error("reloaded transfer for unsupported chain, dropping it",
zap.Stringer("Timestamp", xfer.Timestamp),
zap.Uint64("Value", xfer.Value),
zap.Stringer("EmitterChain", xfer.EmitterChain),
zap.Stringer("EmitterAddress", xfer.EmitterAddress),
zap.String("MsgID", xfer.MsgID),
)
return
}
if xfer.EmitterAddress != ce.emitterAddr {
gov.logger.Error("reloaded transfer for unsupported emitter address, dropping it",
zap.Stringer("Timestamp", xfer.Timestamp),
zap.Uint64("Value", xfer.Value),
zap.Stringer("OriginChain", xfer.OriginChain),
zap.Stringer("OriginAddress", xfer.OriginAddress),
zap.String("MsgID", xfer.MsgID),
)
return
}
tk := tokenKey{chain: xfer.OriginChain, addr: xfer.OriginAddress}
_, exists = gov.tokens[tk]
if !exists {
gov.logger.Error("reloaded transfer for unsupported token, dropping it",
zap.Stringer("Timestamp", xfer.Timestamp),
zap.Uint64("Value", xfer.Value),
zap.Stringer("OriginChain", xfer.OriginChain),
zap.Stringer("OriginAddress", xfer.OriginAddress),
zap.String("MsgID", xfer.MsgID),
)
return
}
if _, alreadyExists := gov.msgsSeen[xfer.Hash]; alreadyExists {
gov.logger.Info("not reloading transfer because it is a duplicate",
zap.Stringer("Timestamp", xfer.Timestamp),
zap.Uint64("Value", xfer.Value),
zap.Stringer("OriginChain", xfer.OriginChain),
zap.Stringer("OriginAddress", xfer.OriginAddress),
zap.String("MsgID", xfer.MsgID),
zap.String("Hash", xfer.Hash),
)
return
}
if xfer.Hash != "" {
gov.logger.Info("reloaded transfer",
zap.Stringer("Timestamp", xfer.Timestamp),
zap.Uint64("Value", xfer.Value),
zap.Stringer("OriginChain", xfer.OriginChain),
zap.Stringer("OriginAddress", xfer.OriginAddress),
zap.String("MsgID", xfer.MsgID),
zap.String("Hash", xfer.Hash),
)
gov.msgsSeen[xfer.Hash] = transferComplete
} else {
gov.logger.Error("reloaded transfer that does not have a hash, will not be able to detect a duplicate",
zap.Stringer("Timestamp", xfer.Timestamp),
zap.Uint64("Value", xfer.Value),
zap.Stringer("OriginChain", xfer.OriginChain),
zap.Stringer("OriginAddress", xfer.OriginAddress),
zap.String("MsgID", xfer.MsgID),
)
}
ce.transfers = append(ce.transfers, xfer)
}