Node/Acct: Handle large batch status queries (#2491)

* Node:Acct: Handle large batch status queries

Change-Id: I29a8f3d88644eae1f20632318d5a497ac08720f5

* Comment change

Change-Id: I1bf3fd020e2e816edb1a81de4fcb5428f7733484
This commit is contained in:
bruce-riley 2023-03-09 10:33:57 -06:00 committed by GitHub
parent c04a32bc55
commit bed48eb9e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 5353 additions and 3 deletions

View File

@ -38,6 +38,9 @@ const (
// maxSubmitPendingTime indicates how long a transfer can be in the submit pending state before the audit starts complaining about it.
maxSubmitPendingTime = 30 * time.Minute
// maxPendingsPerQuery is the maximum number of pending transfers to submit in a single batch_transfer_status query to avoid gas errors.
maxPendingsPerQuery = 500
)
type (
@ -293,16 +296,68 @@ func (acct *Accountant) queryMissingObservations() ([]MissingObservation, error)
return ret.Missing, nil
}
// queryConn allows us to mock the SubmitQuery call.
type queryConn interface {
SubmitQuery(ctx context.Context, contractAddress string, query []byte) ([]byte, error)
}
// queryBatchTransferStatus queries the status of the specified transfers and returns a map keyed by transfer key (as a string) to the status.
func (acct *Accountant) queryBatchTransferStatus(keys []TransferKey) (map[string]*TransferStatus, error) {
return queryBatchTransferStatusWithConn(acct.ctx, acct.logger, acct.wormchainConn, acct.contract, keys)
}
// queryBatchTransferStatus is a free function that queries the status of the specified transfers and returns a map keyed by transfer key (as a string)
// to the status. If there are too many keys to be queried, it breaks them up into smaller chunks (based on the maxPendingsPerQuery constant).
func queryBatchTransferStatusWithConn(
ctx context.Context,
logger *zap.Logger,
qc queryConn,
contract string,
keys []TransferKey,
) (map[string]*TransferStatus, error) {
if len(keys) <= maxPendingsPerQuery {
return queryBatchTransferStatusForChunk(ctx, logger, qc, contract, keys)
}
// Break the large batch into smaller chunks. Found this logic here: https://freshman.tech/snippets/go/split-slice-into-chunks/
ret := make(map[string]*TransferStatus)
for i := 0; i < len(keys); i += maxPendingsPerQuery {
end := i + maxPendingsPerQuery
// Necessary check to avoid slicing beyond slice capacity.
if end > len(keys) {
end = len(keys)
}
chunkRet, err := queryBatchTransferStatusForChunk(ctx, logger, qc, contract, keys[i:end])
if err != nil {
return nil, err
}
for key, item := range chunkRet {
ret[key] = item
}
}
return ret, nil
}
// queryBatchTransferStatus is a free function that queries the status of a chunk of transfers and returns a map keyed by transfer key (as a string) to the status.
func queryBatchTransferStatusForChunk(
ctx context.Context,
logger *zap.Logger,
qc queryConn,
contract string,
keys []TransferKey,
) (map[string]*TransferStatus, error) {
bytes, err := json.Marshal(keys)
if err != nil {
return nil, fmt.Errorf("failed to marshal keys: %w", err)
}
query := fmt.Sprintf(`{"batch_transfer_status":%s}`, string(bytes))
acct.logger.Debug("acctaudit: submitting batch_transfer_status query", zap.String("query", query))
respBytes, err := acct.wormchainConn.SubmitQuery(acct.ctx, acct.contract, []byte(query))
logger.Debug("acctaudit: submitting batch_transfer_status query", zap.String("query", query))
respBytes, err := qc.SubmitQuery(ctx, contract, []byte(query))
if err != nil {
return nil, fmt.Errorf("batch_transfer_status query failed: %w, %s", err, query)
}
@ -317,6 +372,6 @@ func (acct *Accountant) queryBatchTransferStatus(keys []TransferKey) (map[string
ret[item.Key.String()] = item.Status
}
acct.logger.Debug("acctaudit: batch_transfer_status query response", zap.Int("numEntries", len(ret)), zap.String("result", string(respBytes)))
logger.Debug("acctaudit: batch_transfer_status query response", zap.Int("numEntries", len(ret)), zap.String("result", string(respBytes)))
return ret, nil
}

File diff suppressed because it is too large Load Diff

View File

@ -1,8 +1,10 @@
package accountant
import (
"context"
"encoding/hex"
"encoding/json"
"errors"
"reflect"
"testing"
@ -12,6 +14,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
func TestParseMissingObservationsResponse(t *testing.T) {
@ -145,3 +149,86 @@ func TestParseBatchTransferStatusPendingResponse(t *testing.T) {
// Use DeepEqual() because the response contains pointers.
assert.True(t, reflect.DeepEqual(expectedResult, response.Details[0]))
}
// BatchTransferStatusQueryConnMock allows us to mock batch_transfer_status by implementing SubmitQuery.
type BatchTransferStatusQueryConnMock struct {
resp []byte
}
func (qc *BatchTransferStatusQueryConnMock) SubmitQuery(ctx context.Context, contractAddress string, query []byte) ([]byte, error) {
// Force a failure if the query is much bigger than what we are allowing. This does not have to be exact, since the chunking tests will be using a lot more than that.
// A json encoded transfer key is about 150 characters.
if len(query) > 150*maxPendingsPerQuery+1000 {
return []byte{}, errors.New("query too large")
}
return qc.resp, nil
}
// validateBatchTransferStatusResults makes sure the query returned everything expected, and nothing extra.
func validateBatchTransferStatusResults(t *testing.T, keys []TransferKey, transferDetails map[string]*TransferStatus) {
for _, key := range keys {
tKey := key.String()
_, exists := transferDetails[tKey]
require.Equal(t, true, exists)
delete(transferDetails, tKey)
}
require.Equal(t, 0, len(transferDetails))
}
func TestBatchTransferStatusForExactlyOneTransfer(t *testing.T) {
ctx := context.Background()
logger := zap.NewNop()
keys, queryResp := createTransferKeysForTestingBatchTransferStatus(t, 1)
require.Equal(t, 1, len(keys))
qc := &BatchTransferStatusQueryConnMock{resp: queryResp}
transferDetails, err := queryBatchTransferStatusWithConn(ctx, logger, qc, "wormhole14hj2tavq8fpesdwxxcu44rty3hh90vhujrvcmstl4zr3txmfvw9srrg465", keys)
require.NoError(t, err)
require.Equal(t, len(keys), len(transferDetails))
validateBatchTransferStatusResults(t, keys, transferDetails)
}
func TestBatchTransferStatusForExactlyOneChunk(t *testing.T) {
ctx := context.Background()
logger := zap.NewNop()
keys, queryResp := createTransferKeysForTestingBatchTransferStatus(t, maxPendingsPerQuery)
require.Equal(t, maxPendingsPerQuery, len(keys))
qc := &BatchTransferStatusQueryConnMock{resp: queryResp}
transferDetails, err := queryBatchTransferStatusWithConn(ctx, logger, qc, "wormhole14hj2tavq8fpesdwxxcu44rty3hh90vhujrvcmstl4zr3txmfvw9srrg465", keys)
require.NoError(t, err)
require.Equal(t, len(keys), len(transferDetails))
validateBatchTransferStatusResults(t, keys, transferDetails)
}
func TestBatchTransferStatusForExactlyOneChunkPlus1(t *testing.T) {
ctx := context.Background()
logger := zap.NewNop()
keys, queryResp := createTransferKeysForTestingBatchTransferStatus(t, maxPendingsPerQuery+1)
require.Equal(t, maxPendingsPerQuery+1, len(keys))
qc := &BatchTransferStatusQueryConnMock{resp: queryResp}
transferDetails, err := queryBatchTransferStatusWithConn(ctx, logger, qc, "wormhole14hj2tavq8fpesdwxxcu44rty3hh90vhujrvcmstl4zr3txmfvw9srrg465", keys)
require.NoError(t, err)
require.Equal(t, len(keys), len(transferDetails))
validateBatchTransferStatusResults(t, keys, transferDetails)
}
func TestBatchTransferStatusMultipleChunks(t *testing.T) {
ctx := context.Background()
logger := zap.NewNop()
keys, queryResp := createTransferKeysForTestingBatchTransferStatus(t, -1)
require.Less(t, maxPendingsPerQuery, len(keys))
qc := &BatchTransferStatusQueryConnMock{resp: queryResp}
transferDetails, err := queryBatchTransferStatusWithConn(ctx, logger, qc, "wormhole14hj2tavq8fpesdwxxcu44rty3hh90vhujrvcmstl4zr3txmfvw9srrg465", keys)
require.NoError(t, err)
require.Equal(t, len(keys), len(transferDetails))
validateBatchTransferStatusResults(t, keys, transferDetails)
}