From fb9f93e892f1a7a6770846b870f697c8129ce86a Mon Sep 17 00:00:00 2001 From: bruce-riley <96066700+bruce-riley@users.noreply.github.com> Date: Mon, 26 Sep 2022 09:04:30 -0500 Subject: [PATCH] node: add command to purge pythnet VAAs (#1636) * Add command to purge pythnet VAAs * Add test for purging a single emitter address * Fix lint error * Using the wrong delete primative Change-Id: I80d5294c17279d4e49220d81807e5964a5591721 --- node/cmd/guardiand/adminclient.go | 49 +++++ node/cmd/guardiand/adminserver.go | 13 ++ node/pkg/db/db.go | 4 + node/pkg/db/db_test.go | 7 +- node/pkg/db/purge_vaas.go | 80 ++++++++ node/pkg/db/purge_vaas_test.go | 297 ++++++++++++++++++++++++++++++ proto/node/v1/node.proto | 12 ++ 7 files changed, 461 insertions(+), 1 deletion(-) create mode 100644 node/pkg/db/purge_vaas.go create mode 100644 node/pkg/db/purge_vaas_test.go diff --git a/node/cmd/guardiand/adminclient.go b/node/cmd/guardiand/adminclient.go index 165b79e84..8f52778b5 100644 --- a/node/cmd/guardiand/adminclient.go +++ b/node/cmd/guardiand/adminclient.go @@ -55,6 +55,7 @@ func init() { ClientChainGovernorDropPendingVAACmd.Flags().AddFlagSet(pf) ClientChainGovernorReleasePendingVAACmd.Flags().AddFlagSet(pf) ClientChainGovernorResetReleaseTimerCmd.Flags().AddFlagSet(pf) + PurgePythNetVaasCmd.Flags().AddFlagSet(pf) AdminCmd.AddCommand(AdminClientInjectGuardianSetUpdateCmd) AdminCmd.AddCommand(AdminClientFindMissingMessagesCmd) @@ -67,6 +68,7 @@ func init() { AdminCmd.AddCommand(ClientChainGovernorDropPendingVAACmd) AdminCmd.AddCommand(ClientChainGovernorReleasePendingVAACmd) AdminCmd.AddCommand(ClientChainGovernorResetReleaseTimerCmd) + AdminCmd.AddCommand(PurgePythNetVaasCmd) } var AdminCmd = &cobra.Command{ @@ -137,6 +139,13 @@ var ClientChainGovernorResetReleaseTimerCmd = &cobra.Command{ Args: cobra.ExactArgs(1), } +var PurgePythNetVaasCmd = &cobra.Command{ + Use: "purge-pythnet-vaas [DAYS_OLD] ", + Short: "Deletes PythNet VAAs from the database that are more than [DAYS_OLD] days only (if logonly is specified, doesn't delete anything)", + Run: runPurgePythNetVaas, + Args: cobra.RangeArgs(1, 2), +} + func getAdminClient(ctx context.Context, addr string) (*grpc.ClientConn, nodev1.NodePrivilegedServiceClient, error) { conn, err := grpc.DialContext(ctx, fmt.Sprintf("unix:///%s", addr), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -408,3 +417,43 @@ func runChainGovernorResetReleaseTimer(cmd *cobra.Command, args []string) { fmt.Println(resp.Response) } + +func runPurgePythNetVaas(cmd *cobra.Command, args []string) { + daysOld, err := strconv.Atoi(args[0]) + if err != nil { + log.Fatalf("invalid DAYS_OLD: %v", err) + } + + if daysOld < 0 { + log.Fatalf("DAYS_OLD may not be negative") + } + + logOnly := false + if len(args) > 1 { + if args[1] != "logonly" { + log.Fatalf("invalid option, only \"logonly\" is supported") + } + + logOnly = true + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + conn, c, err := getAdminClient(ctx, *clientSocketPath) + if err != nil { + log.Fatalf("failed to get admin client: %v", err) + } + defer conn.Close() + + msg := nodev1.PurgePythNetVaasRequest{ + DaysOld: uint64(daysOld), + LogOnly: logOnly, + } + resp, err := c.PurgePythNetVaas(ctx, &msg) + if err != nil { + log.Fatalf("failed to run PurgePythNetVaas RPC: %s", err) + } + + fmt.Println(resp.Response) +} diff --git a/node/cmd/guardiand/adminserver.go b/node/cmd/guardiand/adminserver.go index cae9dd53a..1ca6084d4 100644 --- a/node/cmd/guardiand/adminserver.go +++ b/node/cmd/guardiand/adminserver.go @@ -485,3 +485,16 @@ func (s *nodePrivilegedService) ChainGovernorResetReleaseTimer(ctx context.Conte Response: resp, }, nil } + +func (s *nodePrivilegedService) PurgePythNetVaas(ctx context.Context, req *nodev1.PurgePythNetVaasRequest) (*nodev1.PurgePythNetVaasResponse, error) { + prefix := db.VAAID{EmitterChain: vaa.ChainIDPythNet} + oldestTime := time.Now().Add(-time.Hour * 24 * time.Duration(req.DaysOld)) + resp, err := s.db.PurgeVaas(prefix, oldestTime, req.LogOnly) + if err != nil { + return nil, err + } + + return &nodev1.PurgePythNetVaasResponse{ + Response: resp, + }, nil +} diff --git a/node/pkg/db/db.go b/node/pkg/db/db.go index 76495cdf0..0dc3c82a0 100644 --- a/node/pkg/db/db.go +++ b/node/pkg/db/db.go @@ -61,6 +61,7 @@ func VaaIDFromVAA(v *vaa.VAA) *VAAID { var ( ErrVAANotFound = errors.New("requested VAA not found in store") + nullAddr = vaa.Address{} ) func (i *VAAID) Bytes() []byte { @@ -68,6 +69,9 @@ func (i *VAAID) Bytes() []byte { } func (i *VAAID) EmitterPrefixBytes() []byte { + if i.EmitterAddress == nullAddr { + return []byte(fmt.Sprintf("signed/%d", i.EmitterChain)) + } return []byte(fmt.Sprintf("signed/%d/%s", i.EmitterChain, i.EmitterAddress)) } diff --git a/node/pkg/db/db_test.go b/node/pkg/db/db_test.go index c5965a8b7..c4a14dc7e 100644 --- a/node/pkg/db/db_test.go +++ b/node/pkg/db/db_test.go @@ -61,7 +61,7 @@ func TestBytes(t *testing.T) { assert.Equal(t, expected, vaaID.Bytes()) } -func TestEmitterPrefixBytes(t *testing.T) { +func TestEmitterPrefixBytesWithChainIDAndAddress(t *testing.T) { vaaIdString := "1/0000000000000000000000000000000000000000000000000000000000000004/1" vaaID, _ := VaaIDFromString(vaaIdString) expected := []byte{0x73, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x2f, 0x31, 0x2f, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x34} @@ -69,6 +69,11 @@ func TestEmitterPrefixBytes(t *testing.T) { assert.Equal(t, expected, vaaID.EmitterPrefixBytes()) } +func TestEmitterPrefixBytesWithOnlyChainID(t *testing.T) { + vaaID := VAAID{EmitterChain: vaa.ChainID(26)} + assert.Equal(t, []byte("signed/26"), vaaID.EmitterPrefixBytes()) +} + func TestStoreSignedVAAUnsigned(t *testing.T) { dbPath := t.TempDir() db, err := Open(dbPath) diff --git a/node/pkg/db/purge_vaas.go b/node/pkg/db/purge_vaas.go new file mode 100644 index 000000000..f709ab0a6 --- /dev/null +++ b/node/pkg/db/purge_vaas.go @@ -0,0 +1,80 @@ +package db + +import ( + "fmt" + "time" + + "github.com/dgraph-io/badger/v3" + "github.com/wormhole-foundation/wormhole/sdk/vaa" +) + +// This function deletes all VAAs for either the specified chain or specified chain / emitter address +// that are older than the specified time. If the logOnly flag is specified, it does not delete anything, +// just counts up what it would have deleted. + +func (d *Database) PurgeVaas(prefix VAAID, oldestTime time.Time, logOnly bool) (string, error) { + if prefix.Sequence != 0 { + return "", fmt.Errorf("may not specify a sequence number on the prefix") + } + + numDeleted := 0 + numKept := 0 + + if err := d.db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(badger.DefaultIteratorOptions) + defer it.Close() + prefix := prefix.EmitterPrefixBytes() + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + item := it.Item() + key := item.Key() + err := item.Value(func(val []byte) error { + v, err := vaa.Unmarshal(val) + if err != nil { + return fmt.Errorf("failed to unmarshal VAA for %s: %v", string(key), err) + } + + if v.Timestamp.Before(oldestTime) { + numDeleted++ + if !logOnly { + if err := d.db.Update(func(txn *badger.Txn) error { + err := txn.Delete(key) + return err + }); err != nil { + return fmt.Errorf("failed to delete vaa for key [%v]: %w", key, err) + } + } + } else { + numKept++ + } + + return nil + }) + if err != nil { + return err + } + } + + return nil + }); err != nil { + return "", err + } + + ret := "" + if logOnly { + ret = fmt.Sprintf("Would purge VAAs for chain %s older than %v.\n", prefix.EmitterChain, oldestTime.String()) + if numDeleted != 0 { + ret += fmt.Sprintf("Would have deleted %v items and kept %v.", numDeleted, numKept) + } else { + ret += fmt.Sprintf("Would not have deleted anything and kept %v items", numKept) + } + } else { + ret = fmt.Sprintf("Purging VAAs for chain %s older than %v.\n", prefix.EmitterChain, oldestTime.String()) + if numDeleted != 0 { + ret += fmt.Sprintf("Deleted %v items and kept %v items", numDeleted, numKept) + } else { + ret += fmt.Sprintf("Did not delete anything, kept %v items", numKept) + } + } + + return ret, nil +} diff --git a/node/pkg/db/purge_vaas_test.go b/node/pkg/db/purge_vaas_test.go new file mode 100644 index 000000000..19c20fb69 --- /dev/null +++ b/node/pkg/db/purge_vaas_test.go @@ -0,0 +1,297 @@ +package db + +import ( + "crypto/ecdsa" + "crypto/rand" + "fmt" + "os" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/wormhole-foundation/wormhole/sdk/vaa" + + "testing" + "time" + + "github.com/dgraph-io/badger/v3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func storeVAA(db *Database, v *vaa.VAA) error { + privKey, _ := ecdsa.GenerateKey(crypto.S256(), rand.Reader) + v.AddSignature(privKey, 0) + return db.StoreSignedVAA(v) +} + +func countVAAs(d *Database, chainId vaa.ChainID) (numThisChain int, numOtherChains int, err error) { + if err = d.db.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.PrefetchSize = 10 + it := txn.NewIterator(opts) + defer it.Close() + for it.Rewind(); it.Valid(); it.Next() { + item := it.Item() + key := item.Key() + err := item.Value(func(val []byte) error { + v, err := vaa.Unmarshal(val) + if err != nil { + return fmt.Errorf("failed to unmarshal VAA for %s: %v", string(key), err) + } + + if v.EmitterChain == chainId { + numThisChain++ + } else { + numOtherChains++ + } + + return nil + }) + if err != nil { + return err + } + } + return nil + }); err != nil { + return + } + + return +} + +func TestPurgingPythnetVAAs(t *testing.T) { + var payload = []byte{97, 97, 97, 97, 97, 97} + var emitterAddress = vaa.Address{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4} + + dbPath := t.TempDir() + db, err := Open(dbPath) + if err != nil { + t.Error("failed to open database") + } + defer db.Close() + defer os.Remove(dbPath) + + now := time.Now() + + // Create 50 VAAs each for Pythnet and Solana that are more than three days old. + timeStamp := now.Add(-time.Hour * time.Duration(3*24+1)) + pythnetSeqNum := uint64(10000) + solanaSeqNum := uint64(20000) + for count := 0; count < 50; count++ { + err = storeVAA(db, &vaa.VAA{ + Version: uint8(1), + GuardianSetIndex: uint32(1), + Signatures: nil, + Timestamp: timeStamp, + Nonce: uint32(1), + Sequence: pythnetSeqNum, + ConsistencyLevel: uint8(32), + EmitterChain: vaa.ChainIDPythNet, + EmitterAddress: emitterAddress, + Payload: payload, + }) + require.NoError(t, err) + pythnetSeqNum++ + + err = storeVAA(db, &vaa.VAA{ + Version: uint8(1), + GuardianSetIndex: uint32(1), + Signatures: nil, + Timestamp: timeStamp, + Nonce: uint32(1), + Sequence: solanaSeqNum, + ConsistencyLevel: uint8(32), + EmitterChain: vaa.ChainIDSolana, + EmitterAddress: emitterAddress, + Payload: payload, + }) + require.NoError(t, err) + solanaSeqNum++ + } + + // Create 75 VAAs each for Pythnet and Solana that are less than three days old. + timeStamp = now.Add(-time.Hour * time.Duration(3*24-1)) + for count := 0; count < 75; count++ { + err = storeVAA(db, &vaa.VAA{ + Version: uint8(1), + GuardianSetIndex: uint32(1), + Signatures: nil, + Timestamp: timeStamp, + Nonce: uint32(1), + Sequence: pythnetSeqNum, + ConsistencyLevel: uint8(32), + EmitterChain: vaa.ChainIDPythNet, + EmitterAddress: emitterAddress, + Payload: payload, + }) + require.NoError(t, err) + pythnetSeqNum++ + + err = storeVAA(db, &vaa.VAA{ + Version: uint8(1), + GuardianSetIndex: uint32(1), + Signatures: nil, + Timestamp: timeStamp, + Nonce: uint32(1), + Sequence: solanaSeqNum, + ConsistencyLevel: uint8(32), + EmitterChain: vaa.ChainIDSolana, + EmitterAddress: emitterAddress, + Payload: payload, + }) + require.NoError(t, err) + solanaSeqNum++ + } + + // Before we do the purge, make sure the database contains what we expect. + numPythnet, numOther, err := countVAAs(db, vaa.ChainIDPythNet) + require.NoError(t, err) + assert.Equal(t, 125, numPythnet) + assert.Equal(t, 125, numOther) + + // Purge PythNet VAAs that are more than three days old. + oldestTime := now.Add(-time.Hour * time.Duration(3*24)) + prefix := VAAID{EmitterChain: vaa.ChainIDPythNet} + _, err = db.PurgeVaas(prefix, oldestTime, false) + require.NoError(t, err) + + // Make sure we deleted the old PythNet VAAs but didn't touch the Solana ones. + numPythnet, numOther, err = countVAAs(db, vaa.ChainIDPythNet) + require.NoError(t, err) + assert.Equal(t, 75, numPythnet) + assert.Equal(t, 125, numOther) +} + +func TestPurgingVAAsForOneEmitterAddress(t *testing.T) { + var payload = []byte{97, 97, 97, 97, 97, 97} + var pythnetEmitterAddress1 = vaa.Address{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1} + var pythnetEmitterAddress2 = vaa.Address{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2} + var solanaEmitterAddress1 = vaa.Address{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1} + + dbPath := t.TempDir() + db, err := Open(dbPath) + if err != nil { + t.Error("failed to open database") + } + defer db.Close() + defer os.Remove(dbPath) + + now := time.Now() + + // Create 50 VAAs each for each emitter that are more than three days old. + timeStamp := now.Add(-time.Hour * time.Duration(3*24+1)) + pythnetSeqNum := uint64(10000) + solanaSeqNum := uint64(20000) + for count := 0; count < 50; count++ { + err = storeVAA(db, &vaa.VAA{ + Version: uint8(1), + GuardianSetIndex: uint32(1), + Signatures: nil, + Timestamp: timeStamp, + Nonce: uint32(1), + Sequence: pythnetSeqNum, + ConsistencyLevel: uint8(32), + EmitterChain: vaa.ChainIDPythNet, + EmitterAddress: pythnetEmitterAddress1, + Payload: payload, + }) + require.NoError(t, err) + + err = storeVAA(db, &vaa.VAA{ + Version: uint8(1), + GuardianSetIndex: uint32(1), + Signatures: nil, + Timestamp: timeStamp, + Nonce: uint32(1), + Sequence: pythnetSeqNum, + ConsistencyLevel: uint8(32), + EmitterChain: vaa.ChainIDPythNet, + EmitterAddress: pythnetEmitterAddress2, + Payload: payload, + }) + require.NoError(t, err) + + pythnetSeqNum++ + + err = storeVAA(db, &vaa.VAA{ + Version: uint8(1), + GuardianSetIndex: uint32(1), + Signatures: nil, + Timestamp: timeStamp, + Nonce: uint32(1), + Sequence: solanaSeqNum, + ConsistencyLevel: uint8(32), + EmitterChain: vaa.ChainIDSolana, + EmitterAddress: solanaEmitterAddress1, + Payload: payload, + }) + require.NoError(t, err) + solanaSeqNum++ + } + + // Create 75 VAAs each for each emitter that are less than three days old. + timeStamp = now.Add(-time.Hour * time.Duration(3*24-1)) + for count := 0; count < 75; count++ { + err = storeVAA(db, &vaa.VAA{ + Version: uint8(1), + GuardianSetIndex: uint32(1), + Signatures: nil, + Timestamp: timeStamp, + Nonce: uint32(1), + Sequence: pythnetSeqNum, + ConsistencyLevel: uint8(32), + EmitterChain: vaa.ChainIDPythNet, + EmitterAddress: pythnetEmitterAddress1, + Payload: payload, + }) + require.NoError(t, err) + + err = storeVAA(db, &vaa.VAA{ + Version: uint8(1), + GuardianSetIndex: uint32(1), + Signatures: nil, + Timestamp: timeStamp, + Nonce: uint32(1), + Sequence: pythnetSeqNum, + ConsistencyLevel: uint8(32), + EmitterChain: vaa.ChainIDPythNet, + EmitterAddress: pythnetEmitterAddress2, + Payload: payload, + }) + require.NoError(t, err) + + pythnetSeqNum++ + + err = storeVAA(db, &vaa.VAA{ + Version: uint8(1), + GuardianSetIndex: uint32(1), + Signatures: nil, + Timestamp: timeStamp, + Nonce: uint32(1), + Sequence: solanaSeqNum, + ConsistencyLevel: uint8(32), + EmitterChain: vaa.ChainIDSolana, + EmitterAddress: solanaEmitterAddress1, + Payload: payload, + }) + require.NoError(t, err) + solanaSeqNum++ + } + + // Before we do the purge, make sure the database contains what we expect. + numPythnet, numOther, err := countVAAs(db, vaa.ChainIDPythNet) + require.NoError(t, err) + assert.Equal(t, 250, numPythnet) + assert.Equal(t, 125, numOther) + + // Purge VAAs for a single PythNet emitter that are more than three days old. + oldestTime := now.Add(-time.Hour * time.Duration(3*24)) + prefix := VAAID{EmitterChain: vaa.ChainIDPythNet, EmitterAddress: pythnetEmitterAddress1} + _, err = db.PurgeVaas(prefix, oldestTime, false) + require.NoError(t, err) + + // Make sure we deleted the old PythNet VAAs but didn't touch the Solana ones. + numPythnet, numOther, err = countVAAs(db, vaa.ChainIDPythNet) + require.NoError(t, err) + assert.Equal(t, 200, numPythnet) + assert.Equal(t, 125, numOther) +} diff --git a/proto/node/v1/node.proto b/proto/node/v1/node.proto index 8dcb34218..6525f4562 100644 --- a/proto/node/v1/node.proto +++ b/proto/node/v1/node.proto @@ -43,6 +43,9 @@ service NodePrivilegedService { // ChainGovernorResetReleaseTimer resets the release timer for a chain governor pending VAA to the configured maximum. rpc ChainGovernorResetReleaseTimer (ChainGovernorResetReleaseTimerRequest) returns (ChainGovernorResetReleaseTimerResponse); + + // PurgePythNetVaas deletes PythNet VAAs from the database that are more than the specified number of days old. + rpc PurgePythNetVaas (PurgePythNetVaasRequest) returns (PurgePythNetVaasResponse); } message InjectGovernanceVAARequest { @@ -200,3 +203,12 @@ message ChainGovernorResetReleaseTimerRequest { message ChainGovernorResetReleaseTimerResponse { string response = 1; } + +message PurgePythNetVaasRequest { + uint64 days_old = 1; + bool log_only = 2; +} + +message PurgePythNetVaasResponse { + string response = 1; +}