node: add command to purge pythnet VAAs (#1636)

* Add command to purge pythnet VAAs

* Add test for purging a single emitter address

* Fix lint error

* Using the wrong delete primative

Change-Id: I80d5294c17279d4e49220d81807e5964a5591721
This commit is contained in:
bruce-riley 2022-09-26 09:04:30 -05:00 committed by GitHub
parent 239e27ca91
commit fb9f93e892
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 461 additions and 1 deletions

View File

@ -55,6 +55,7 @@ func init() {
ClientChainGovernorDropPendingVAACmd.Flags().AddFlagSet(pf)
ClientChainGovernorReleasePendingVAACmd.Flags().AddFlagSet(pf)
ClientChainGovernorResetReleaseTimerCmd.Flags().AddFlagSet(pf)
PurgePythNetVaasCmd.Flags().AddFlagSet(pf)
AdminCmd.AddCommand(AdminClientInjectGuardianSetUpdateCmd)
AdminCmd.AddCommand(AdminClientFindMissingMessagesCmd)
@ -67,6 +68,7 @@ func init() {
AdminCmd.AddCommand(ClientChainGovernorDropPendingVAACmd)
AdminCmd.AddCommand(ClientChainGovernorReleasePendingVAACmd)
AdminCmd.AddCommand(ClientChainGovernorResetReleaseTimerCmd)
AdminCmd.AddCommand(PurgePythNetVaasCmd)
}
var AdminCmd = &cobra.Command{
@ -137,6 +139,13 @@ var ClientChainGovernorResetReleaseTimerCmd = &cobra.Command{
Args: cobra.ExactArgs(1),
}
var PurgePythNetVaasCmd = &cobra.Command{
Use: "purge-pythnet-vaas [DAYS_OLD] <logonly>",
Short: "Deletes PythNet VAAs from the database that are more than [DAYS_OLD] days only (if logonly is specified, doesn't delete anything)",
Run: runPurgePythNetVaas,
Args: cobra.RangeArgs(1, 2),
}
func getAdminClient(ctx context.Context, addr string) (*grpc.ClientConn, nodev1.NodePrivilegedServiceClient, error) {
conn, err := grpc.DialContext(ctx, fmt.Sprintf("unix:///%s", addr), grpc.WithTransportCredentials(insecure.NewCredentials()))
@ -408,3 +417,43 @@ func runChainGovernorResetReleaseTimer(cmd *cobra.Command, args []string) {
fmt.Println(resp.Response)
}
func runPurgePythNetVaas(cmd *cobra.Command, args []string) {
daysOld, err := strconv.Atoi(args[0])
if err != nil {
log.Fatalf("invalid DAYS_OLD: %v", err)
}
if daysOld < 0 {
log.Fatalf("DAYS_OLD may not be negative")
}
logOnly := false
if len(args) > 1 {
if args[1] != "logonly" {
log.Fatalf("invalid option, only \"logonly\" is supported")
}
logOnly = true
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
conn, c, err := getAdminClient(ctx, *clientSocketPath)
if err != nil {
log.Fatalf("failed to get admin client: %v", err)
}
defer conn.Close()
msg := nodev1.PurgePythNetVaasRequest{
DaysOld: uint64(daysOld),
LogOnly: logOnly,
}
resp, err := c.PurgePythNetVaas(ctx, &msg)
if err != nil {
log.Fatalf("failed to run PurgePythNetVaas RPC: %s", err)
}
fmt.Println(resp.Response)
}

View File

@ -485,3 +485,16 @@ func (s *nodePrivilegedService) ChainGovernorResetReleaseTimer(ctx context.Conte
Response: resp,
}, nil
}
func (s *nodePrivilegedService) PurgePythNetVaas(ctx context.Context, req *nodev1.PurgePythNetVaasRequest) (*nodev1.PurgePythNetVaasResponse, error) {
prefix := db.VAAID{EmitterChain: vaa.ChainIDPythNet}
oldestTime := time.Now().Add(-time.Hour * 24 * time.Duration(req.DaysOld))
resp, err := s.db.PurgeVaas(prefix, oldestTime, req.LogOnly)
if err != nil {
return nil, err
}
return &nodev1.PurgePythNetVaasResponse{
Response: resp,
}, nil
}

View File

@ -61,6 +61,7 @@ func VaaIDFromVAA(v *vaa.VAA) *VAAID {
var (
ErrVAANotFound = errors.New("requested VAA not found in store")
nullAddr = vaa.Address{}
)
func (i *VAAID) Bytes() []byte {
@ -68,6 +69,9 @@ func (i *VAAID) Bytes() []byte {
}
func (i *VAAID) EmitterPrefixBytes() []byte {
if i.EmitterAddress == nullAddr {
return []byte(fmt.Sprintf("signed/%d", i.EmitterChain))
}
return []byte(fmt.Sprintf("signed/%d/%s", i.EmitterChain, i.EmitterAddress))
}

View File

@ -61,7 +61,7 @@ func TestBytes(t *testing.T) {
assert.Equal(t, expected, vaaID.Bytes())
}
func TestEmitterPrefixBytes(t *testing.T) {
func TestEmitterPrefixBytesWithChainIDAndAddress(t *testing.T) {
vaaIdString := "1/0000000000000000000000000000000000000000000000000000000000000004/1"
vaaID, _ := VaaIDFromString(vaaIdString)
expected := []byte{0x73, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x2f, 0x31, 0x2f, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x34}
@ -69,6 +69,11 @@ func TestEmitterPrefixBytes(t *testing.T) {
assert.Equal(t, expected, vaaID.EmitterPrefixBytes())
}
func TestEmitterPrefixBytesWithOnlyChainID(t *testing.T) {
vaaID := VAAID{EmitterChain: vaa.ChainID(26)}
assert.Equal(t, []byte("signed/26"), vaaID.EmitterPrefixBytes())
}
func TestStoreSignedVAAUnsigned(t *testing.T) {
dbPath := t.TempDir()
db, err := Open(dbPath)

80
node/pkg/db/purge_vaas.go Normal file
View File

@ -0,0 +1,80 @@
package db
import (
"fmt"
"time"
"github.com/dgraph-io/badger/v3"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
// This function deletes all VAAs for either the specified chain or specified chain / emitter address
// that are older than the specified time. If the logOnly flag is specified, it does not delete anything,
// just counts up what it would have deleted.
func (d *Database) PurgeVaas(prefix VAAID, oldestTime time.Time, logOnly bool) (string, error) {
if prefix.Sequence != 0 {
return "", fmt.Errorf("may not specify a sequence number on the prefix")
}
numDeleted := 0
numKept := 0
if err := d.db.View(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
prefix := prefix.EmitterPrefixBytes()
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
item := it.Item()
key := item.Key()
err := item.Value(func(val []byte) error {
v, err := vaa.Unmarshal(val)
if err != nil {
return fmt.Errorf("failed to unmarshal VAA for %s: %v", string(key), err)
}
if v.Timestamp.Before(oldestTime) {
numDeleted++
if !logOnly {
if err := d.db.Update(func(txn *badger.Txn) error {
err := txn.Delete(key)
return err
}); err != nil {
return fmt.Errorf("failed to delete vaa for key [%v]: %w", key, err)
}
}
} else {
numKept++
}
return nil
})
if err != nil {
return err
}
}
return nil
}); err != nil {
return "", err
}
ret := ""
if logOnly {
ret = fmt.Sprintf("Would purge VAAs for chain %s older than %v.\n", prefix.EmitterChain, oldestTime.String())
if numDeleted != 0 {
ret += fmt.Sprintf("Would have deleted %v items and kept %v.", numDeleted, numKept)
} else {
ret += fmt.Sprintf("Would not have deleted anything and kept %v items", numKept)
}
} else {
ret = fmt.Sprintf("Purging VAAs for chain %s older than %v.\n", prefix.EmitterChain, oldestTime.String())
if numDeleted != 0 {
ret += fmt.Sprintf("Deleted %v items and kept %v items", numDeleted, numKept)
} else {
ret += fmt.Sprintf("Did not delete anything, kept %v items", numKept)
}
}
return ret, nil
}

View File

@ -0,0 +1,297 @@
package db
import (
"crypto/ecdsa"
"crypto/rand"
"fmt"
"os"
"github.com/ethereum/go-ethereum/crypto"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"testing"
"time"
"github.com/dgraph-io/badger/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func storeVAA(db *Database, v *vaa.VAA) error {
privKey, _ := ecdsa.GenerateKey(crypto.S256(), rand.Reader)
v.AddSignature(privKey, 0)
return db.StoreSignedVAA(v)
}
func countVAAs(d *Database, chainId vaa.ChainID) (numThisChain int, numOtherChains int, err error) {
if err = d.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchSize = 10
it := txn.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
key := item.Key()
err := item.Value(func(val []byte) error {
v, err := vaa.Unmarshal(val)
if err != nil {
return fmt.Errorf("failed to unmarshal VAA for %s: %v", string(key), err)
}
if v.EmitterChain == chainId {
numThisChain++
} else {
numOtherChains++
}
return nil
})
if err != nil {
return err
}
}
return nil
}); err != nil {
return
}
return
}
func TestPurgingPythnetVAAs(t *testing.T) {
var payload = []byte{97, 97, 97, 97, 97, 97}
var emitterAddress = vaa.Address{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4}
dbPath := t.TempDir()
db, err := Open(dbPath)
if err != nil {
t.Error("failed to open database")
}
defer db.Close()
defer os.Remove(dbPath)
now := time.Now()
// Create 50 VAAs each for Pythnet and Solana that are more than three days old.
timeStamp := now.Add(-time.Hour * time.Duration(3*24+1))
pythnetSeqNum := uint64(10000)
solanaSeqNum := uint64(20000)
for count := 0; count < 50; count++ {
err = storeVAA(db, &vaa.VAA{
Version: uint8(1),
GuardianSetIndex: uint32(1),
Signatures: nil,
Timestamp: timeStamp,
Nonce: uint32(1),
Sequence: pythnetSeqNum,
ConsistencyLevel: uint8(32),
EmitterChain: vaa.ChainIDPythNet,
EmitterAddress: emitterAddress,
Payload: payload,
})
require.NoError(t, err)
pythnetSeqNum++
err = storeVAA(db, &vaa.VAA{
Version: uint8(1),
GuardianSetIndex: uint32(1),
Signatures: nil,
Timestamp: timeStamp,
Nonce: uint32(1),
Sequence: solanaSeqNum,
ConsistencyLevel: uint8(32),
EmitterChain: vaa.ChainIDSolana,
EmitterAddress: emitterAddress,
Payload: payload,
})
require.NoError(t, err)
solanaSeqNum++
}
// Create 75 VAAs each for Pythnet and Solana that are less than three days old.
timeStamp = now.Add(-time.Hour * time.Duration(3*24-1))
for count := 0; count < 75; count++ {
err = storeVAA(db, &vaa.VAA{
Version: uint8(1),
GuardianSetIndex: uint32(1),
Signatures: nil,
Timestamp: timeStamp,
Nonce: uint32(1),
Sequence: pythnetSeqNum,
ConsistencyLevel: uint8(32),
EmitterChain: vaa.ChainIDPythNet,
EmitterAddress: emitterAddress,
Payload: payload,
})
require.NoError(t, err)
pythnetSeqNum++
err = storeVAA(db, &vaa.VAA{
Version: uint8(1),
GuardianSetIndex: uint32(1),
Signatures: nil,
Timestamp: timeStamp,
Nonce: uint32(1),
Sequence: solanaSeqNum,
ConsistencyLevel: uint8(32),
EmitterChain: vaa.ChainIDSolana,
EmitterAddress: emitterAddress,
Payload: payload,
})
require.NoError(t, err)
solanaSeqNum++
}
// Before we do the purge, make sure the database contains what we expect.
numPythnet, numOther, err := countVAAs(db, vaa.ChainIDPythNet)
require.NoError(t, err)
assert.Equal(t, 125, numPythnet)
assert.Equal(t, 125, numOther)
// Purge PythNet VAAs that are more than three days old.
oldestTime := now.Add(-time.Hour * time.Duration(3*24))
prefix := VAAID{EmitterChain: vaa.ChainIDPythNet}
_, err = db.PurgeVaas(prefix, oldestTime, false)
require.NoError(t, err)
// Make sure we deleted the old PythNet VAAs but didn't touch the Solana ones.
numPythnet, numOther, err = countVAAs(db, vaa.ChainIDPythNet)
require.NoError(t, err)
assert.Equal(t, 75, numPythnet)
assert.Equal(t, 125, numOther)
}
func TestPurgingVAAsForOneEmitterAddress(t *testing.T) {
var payload = []byte{97, 97, 97, 97, 97, 97}
var pythnetEmitterAddress1 = vaa.Address{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}
var pythnetEmitterAddress2 = vaa.Address{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2}
var solanaEmitterAddress1 = vaa.Address{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}
dbPath := t.TempDir()
db, err := Open(dbPath)
if err != nil {
t.Error("failed to open database")
}
defer db.Close()
defer os.Remove(dbPath)
now := time.Now()
// Create 50 VAAs each for each emitter that are more than three days old.
timeStamp := now.Add(-time.Hour * time.Duration(3*24+1))
pythnetSeqNum := uint64(10000)
solanaSeqNum := uint64(20000)
for count := 0; count < 50; count++ {
err = storeVAA(db, &vaa.VAA{
Version: uint8(1),
GuardianSetIndex: uint32(1),
Signatures: nil,
Timestamp: timeStamp,
Nonce: uint32(1),
Sequence: pythnetSeqNum,
ConsistencyLevel: uint8(32),
EmitterChain: vaa.ChainIDPythNet,
EmitterAddress: pythnetEmitterAddress1,
Payload: payload,
})
require.NoError(t, err)
err = storeVAA(db, &vaa.VAA{
Version: uint8(1),
GuardianSetIndex: uint32(1),
Signatures: nil,
Timestamp: timeStamp,
Nonce: uint32(1),
Sequence: pythnetSeqNum,
ConsistencyLevel: uint8(32),
EmitterChain: vaa.ChainIDPythNet,
EmitterAddress: pythnetEmitterAddress2,
Payload: payload,
})
require.NoError(t, err)
pythnetSeqNum++
err = storeVAA(db, &vaa.VAA{
Version: uint8(1),
GuardianSetIndex: uint32(1),
Signatures: nil,
Timestamp: timeStamp,
Nonce: uint32(1),
Sequence: solanaSeqNum,
ConsistencyLevel: uint8(32),
EmitterChain: vaa.ChainIDSolana,
EmitterAddress: solanaEmitterAddress1,
Payload: payload,
})
require.NoError(t, err)
solanaSeqNum++
}
// Create 75 VAAs each for each emitter that are less than three days old.
timeStamp = now.Add(-time.Hour * time.Duration(3*24-1))
for count := 0; count < 75; count++ {
err = storeVAA(db, &vaa.VAA{
Version: uint8(1),
GuardianSetIndex: uint32(1),
Signatures: nil,
Timestamp: timeStamp,
Nonce: uint32(1),
Sequence: pythnetSeqNum,
ConsistencyLevel: uint8(32),
EmitterChain: vaa.ChainIDPythNet,
EmitterAddress: pythnetEmitterAddress1,
Payload: payload,
})
require.NoError(t, err)
err = storeVAA(db, &vaa.VAA{
Version: uint8(1),
GuardianSetIndex: uint32(1),
Signatures: nil,
Timestamp: timeStamp,
Nonce: uint32(1),
Sequence: pythnetSeqNum,
ConsistencyLevel: uint8(32),
EmitterChain: vaa.ChainIDPythNet,
EmitterAddress: pythnetEmitterAddress2,
Payload: payload,
})
require.NoError(t, err)
pythnetSeqNum++
err = storeVAA(db, &vaa.VAA{
Version: uint8(1),
GuardianSetIndex: uint32(1),
Signatures: nil,
Timestamp: timeStamp,
Nonce: uint32(1),
Sequence: solanaSeqNum,
ConsistencyLevel: uint8(32),
EmitterChain: vaa.ChainIDSolana,
EmitterAddress: solanaEmitterAddress1,
Payload: payload,
})
require.NoError(t, err)
solanaSeqNum++
}
// Before we do the purge, make sure the database contains what we expect.
numPythnet, numOther, err := countVAAs(db, vaa.ChainIDPythNet)
require.NoError(t, err)
assert.Equal(t, 250, numPythnet)
assert.Equal(t, 125, numOther)
// Purge VAAs for a single PythNet emitter that are more than three days old.
oldestTime := now.Add(-time.Hour * time.Duration(3*24))
prefix := VAAID{EmitterChain: vaa.ChainIDPythNet, EmitterAddress: pythnetEmitterAddress1}
_, err = db.PurgeVaas(prefix, oldestTime, false)
require.NoError(t, err)
// Make sure we deleted the old PythNet VAAs but didn't touch the Solana ones.
numPythnet, numOther, err = countVAAs(db, vaa.ChainIDPythNet)
require.NoError(t, err)
assert.Equal(t, 200, numPythnet)
assert.Equal(t, 125, numOther)
}

View File

@ -43,6 +43,9 @@ service NodePrivilegedService {
// ChainGovernorResetReleaseTimer resets the release timer for a chain governor pending VAA to the configured maximum.
rpc ChainGovernorResetReleaseTimer (ChainGovernorResetReleaseTimerRequest) returns (ChainGovernorResetReleaseTimerResponse);
// PurgePythNetVaas deletes PythNet VAAs from the database that are more than the specified number of days old.
rpc PurgePythNetVaas (PurgePythNetVaasRequest) returns (PurgePythNetVaasResponse);
}
message InjectGovernanceVAARequest {
@ -200,3 +203,12 @@ message ChainGovernorResetReleaseTimerRequest {
message ChainGovernorResetReleaseTimerResponse {
string response = 1;
}
message PurgePythNetVaasRequest {
uint64 days_old = 1;
bool log_only = 2;
}
message PurgePythNetVaasResponse {
string response = 1;
}