fix: deadlock when querying group members (#12342)
This commit is contained in:
parent
cc83d7474b
commit
18306a1a4c
|
@ -129,10 +129,8 @@ func uniqueKeysAddFunc(store sdk.KVStore, secondaryIndexKey interface{}, rowID R
|
|||
return sdkerrors.Wrap(errors.ErrORMInvalidArgument, "empty index key")
|
||||
}
|
||||
|
||||
it := store.Iterator(PrefixRange(secondaryIndexKeyBytes))
|
||||
defer it.Close()
|
||||
if it.Valid() {
|
||||
return errors.ErrORMUniqueConstraint
|
||||
if err := checkUniqueIndexKey(store, secondaryIndexKeyBytes); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
indexKey, err := buildKeyFromParts([]interface{}{secondaryIndexKey, []byte(rowID)})
|
||||
|
@ -144,6 +142,16 @@ func uniqueKeysAddFunc(store sdk.KVStore, secondaryIndexKey interface{}, rowID R
|
|||
return nil
|
||||
}
|
||||
|
||||
// checkUniqueIndexKey checks that the given secondary index key is unique
|
||||
func checkUniqueIndexKey(store sdk.KVStore, secondaryIndexKeyBytes []byte) error {
|
||||
it := store.Iterator(PrefixRange(secondaryIndexKeyBytes))
|
||||
defer it.Close()
|
||||
if it.Valid() {
|
||||
return errors.ErrORMUniqueConstraint
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// multiKeyAddFunc allows multiple entries for a key
|
||||
func multiKeyAddFunc(store sdk.KVStore, secondaryIndexKey interface{}, rowID RowID) error {
|
||||
secondaryIndexKeyBytes, err := keyPartBytes(secondaryIndexKey, false)
|
||||
|
|
|
@ -170,9 +170,7 @@ func (a table) Has(store sdk.KVStore, key RowID) bool {
|
|||
return false
|
||||
}
|
||||
pStore := prefix.NewStore(store, a.prefix[:])
|
||||
it := pStore.Iterator(PrefixRange(key))
|
||||
defer it.Close()
|
||||
return it.Valid()
|
||||
return pStore.Has(key)
|
||||
}
|
||||
|
||||
// GetOne load the object persisted for the given RowID into the dest parameter.
|
||||
|
@ -252,11 +250,9 @@ func (a table) Export(store sdk.KVStore, dest ModelSlicePtr) (uint64, error) {
|
|||
// data should be a slice of structs that implement PrimaryKeyed.
|
||||
func (a table) Import(store sdk.KVStore, data interface{}, _ uint64) error {
|
||||
// Clear all data
|
||||
pStore := prefix.NewStore(store, a.prefix[:])
|
||||
it := pStore.Iterator(nil, nil)
|
||||
defer it.Close()
|
||||
for ; it.Valid(); it.Next() {
|
||||
if err := a.Delete(store, it.Key()); err != nil {
|
||||
keys := a.keys(store)
|
||||
for _, key := range keys {
|
||||
if err := a.Delete(store, key); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -282,6 +278,18 @@ func (a table) Import(store sdk.KVStore, data interface{}, _ uint64) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (a table) keys(store sdk.KVStore) [][]byte {
|
||||
pStore := prefix.NewStore(store, a.prefix[:])
|
||||
it := pStore.Iterator(nil, nil)
|
||||
defer it.Close()
|
||||
|
||||
var keys [][]byte
|
||||
for ; it.Valid(); it.Next() {
|
||||
keys = append(keys, it.Key())
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
// typeSafeIterator is initialized with a type safe RowGetter only.
|
||||
type typeSafeIterator struct {
|
||||
store sdk.KVStore
|
||||
|
|
|
@ -113,12 +113,12 @@ func NewTypeSafeRowGetter(prefixKey [2]byte, model reflect.Type, cdc codec.Codec
|
|||
}
|
||||
|
||||
pStore := prefix.NewStore(store, prefixKey[:])
|
||||
it := pStore.Iterator(PrefixRange(rowID))
|
||||
defer it.Close()
|
||||
if !it.Valid() {
|
||||
bz := pStore.Get(rowID)
|
||||
if len(bz) == 0 {
|
||||
return sdkerrors.ErrNotFound
|
||||
}
|
||||
return cdc.Unmarshal(it.Value(), dest)
|
||||
|
||||
return cdc.Unmarshal(bz, dest)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -223,12 +223,12 @@ func (k Keeper) GetGroupSequence(ctx sdk.Context) uint64 {
|
|||
return k.groupTable.Sequence().CurVal(ctx.KVStore(k.key))
|
||||
}
|
||||
|
||||
// iterateProposalsByVPEnd iterates over all proposals whose voting_period_end is after the `endTime` time argument.
|
||||
func (k Keeper) iterateProposalsByVPEnd(ctx sdk.Context, endTime time.Time, cb func(proposal group.Proposal) (bool, error)) error {
|
||||
// proposalsByVPEnd returns all proposals whose voting_period_end is after the `endTime` time argument.
|
||||
func (k Keeper) proposalsByVPEnd(ctx sdk.Context, endTime time.Time) (proposals []group.Proposal, err error) {
|
||||
timeBytes := sdk.FormatTimeBytes(endTime)
|
||||
it, err := k.proposalsByVotingPeriodEnd.PrefixScan(ctx.KVStore(k.key), nil, timeBytes)
|
||||
if err != nil {
|
||||
return err
|
||||
return proposals, err
|
||||
}
|
||||
defer it.Close()
|
||||
|
||||
|
@ -248,19 +248,12 @@ func (k Keeper) iterateProposalsByVPEnd(ctx sdk.Context, endTime time.Time, cb f
|
|||
break
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
stop, err := cb(proposal)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if stop {
|
||||
break
|
||||
return proposals, err
|
||||
}
|
||||
proposals = append(proposals, proposal)
|
||||
}
|
||||
|
||||
return nil
|
||||
return proposals, nil
|
||||
}
|
||||
|
||||
// pruneProposal deletes a proposal from state.
|
||||
|
@ -279,22 +272,12 @@ func (k Keeper) pruneProposal(ctx sdk.Context, proposalID uint64) error {
|
|||
// abortProposals iterates through all proposals by group policy index
|
||||
// and marks submitted proposals as aborted.
|
||||
func (k Keeper) abortProposals(ctx sdk.Context, groupPolicyAddr sdk.AccAddress) error {
|
||||
proposalIt, err := k.proposalByGroupPolicyIndex.Get(ctx.KVStore(k.key), groupPolicyAddr.Bytes())
|
||||
proposals, err := k.proposalsByGroupPolicy(ctx, groupPolicyAddr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer proposalIt.Close()
|
||||
|
||||
for {
|
||||
var proposalInfo group.Proposal
|
||||
_, err = proposalIt.LoadNext(&proposalInfo)
|
||||
if errors.ErrORMIteratorDone.Is(err) {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, proposalInfo := range proposals {
|
||||
// Mark all proposals still in the voting phase as aborted.
|
||||
if proposalInfo.Status == group.PROPOSAL_STATUS_SUBMITTED {
|
||||
proposalInfo.Status = group.PROPOSAL_STATUS_ABORTED
|
||||
|
@ -307,26 +290,39 @@ func (k Keeper) abortProposals(ctx sdk.Context, groupPolicyAddr sdk.AccAddress)
|
|||
return nil
|
||||
}
|
||||
|
||||
// pruneVotes prunes all votes for a proposal from state.
|
||||
func (k Keeper) pruneVotes(ctx sdk.Context, proposalID uint64) error {
|
||||
store := ctx.KVStore(k.key)
|
||||
it, err := k.voteByProposalIndex.Get(store, proposalID)
|
||||
// proposalsByGroupPolicy returns all proposals for a given group policy.
|
||||
func (k Keeper) proposalsByGroupPolicy(ctx sdk.Context, groupPolicyAddr sdk.AccAddress) ([]group.Proposal, error) {
|
||||
proposalIt, err := k.proposalByGroupPolicyIndex.Get(ctx.KVStore(k.key), groupPolicyAddr.Bytes())
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
defer it.Close()
|
||||
defer proposalIt.Close()
|
||||
|
||||
var proposals []group.Proposal
|
||||
for {
|
||||
var vote group.Vote
|
||||
_, err = it.LoadNext(&vote)
|
||||
var proposalInfo group.Proposal
|
||||
_, err = proposalIt.LoadNext(&proposalInfo)
|
||||
if errors.ErrORMIteratorDone.Is(err) {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
return proposals, err
|
||||
}
|
||||
|
||||
err = k.voteTable.Delete(store, &vote)
|
||||
proposals = append(proposals, proposalInfo)
|
||||
}
|
||||
return proposals, nil
|
||||
}
|
||||
|
||||
// pruneVotes prunes all votes for a proposal from state.
|
||||
func (k Keeper) pruneVotes(ctx sdk.Context, proposalID uint64) error {
|
||||
votes, err := k.votesByProposal(ctx, proposalID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, v := range votes {
|
||||
err = k.voteTable.Delete(ctx.KVStore(k.key), &v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -335,20 +331,42 @@ func (k Keeper) pruneVotes(ctx sdk.Context, proposalID uint64) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// votesByProposal returns all votes for a given proposal.
|
||||
func (k Keeper) votesByProposal(ctx sdk.Context, proposalID uint64) ([]group.Vote, error) {
|
||||
it, err := k.voteByProposalIndex.Get(ctx.KVStore(k.key), proposalID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer it.Close()
|
||||
|
||||
var votes []group.Vote
|
||||
for {
|
||||
var vote group.Vote
|
||||
_, err = it.LoadNext(&vote)
|
||||
if errors.ErrORMIteratorDone.Is(err) {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return votes, err
|
||||
}
|
||||
votes = append(votes, vote)
|
||||
}
|
||||
return votes, nil
|
||||
}
|
||||
|
||||
// PruneProposals prunes all proposals that are expired, i.e. whose
|
||||
// `voting_period + max_execution_period` is greater than the current block
|
||||
// time.
|
||||
func (k Keeper) PruneProposals(ctx sdk.Context) error {
|
||||
err := k.iterateProposalsByVPEnd(ctx, ctx.BlockTime().Add(-k.config.MaxExecutionPeriod), func(proposal group.Proposal) (bool, error) {
|
||||
proposals, err := k.proposalsByVPEnd(ctx, ctx.BlockTime().Add(-k.config.MaxExecutionPeriod))
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
for _, proposal := range proposals {
|
||||
err := k.pruneProposal(ctx, proposal.Id)
|
||||
if err != nil {
|
||||
return true, err
|
||||
return err
|
||||
}
|
||||
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -358,36 +376,39 @@ func (k Keeper) PruneProposals(ctx sdk.Context) error {
|
|||
// has ended, tallies their votes, prunes them, and updates the proposal's
|
||||
// `FinalTallyResult` field.
|
||||
func (k Keeper) TallyProposalsAtVPEnd(ctx sdk.Context) error {
|
||||
return k.iterateProposalsByVPEnd(ctx, ctx.BlockTime(), func(proposal group.Proposal) (bool, error) {
|
||||
proposals, err := k.proposalsByVPEnd(ctx, ctx.BlockTime())
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
for _, proposal := range proposals {
|
||||
policyInfo, err := k.getGroupPolicyInfo(ctx, proposal.GroupPolicyAddress)
|
||||
if err != nil {
|
||||
return true, sdkerrors.Wrap(err, "group policy")
|
||||
return sdkerrors.Wrap(err, "group policy")
|
||||
}
|
||||
|
||||
electorate, err := k.getGroupInfo(ctx, policyInfo.GroupId)
|
||||
if err != nil {
|
||||
return true, sdkerrors.Wrap(err, "group")
|
||||
return sdkerrors.Wrap(err, "group")
|
||||
}
|
||||
|
||||
proposalID := proposal.Id
|
||||
if proposal.Status == group.PROPOSAL_STATUS_ABORTED || proposal.Status == group.PROPOSAL_STATUS_WITHDRAWN {
|
||||
if err := k.pruneProposal(ctx, proposalID); err != nil {
|
||||
return true, err
|
||||
return err
|
||||
}
|
||||
if err := k.pruneVotes(ctx, proposalID); err != nil {
|
||||
return true, err
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
err = k.doTallyAndUpdate(ctx, &proposal, electorate, policyInfo)
|
||||
if err != nil {
|
||||
return true, sdkerrors.Wrap(err, "doTallyAndUpdate")
|
||||
return sdkerrors.Wrap(err, "doTallyAndUpdate")
|
||||
}
|
||||
|
||||
if err := k.proposalTable.Update(ctx.KVStore(k.key), proposal.Id, &proposal); err != nil {
|
||||
return true, sdkerrors.Wrap(err, "proposal update")
|
||||
return sdkerrors.Wrap(err, "proposal update")
|
||||
}
|
||||
}
|
||||
|
||||
return false, nil
|
||||
})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -99,6 +99,38 @@ func TestKeeperTestSuite(t *testing.T) {
|
|||
suite.Run(t, new(TestSuite))
|
||||
}
|
||||
|
||||
func (s *TestSuite) TestCreateGroupWithLotsOfMembers() {
|
||||
for i := 50; i < 70; i++ {
|
||||
membersResp := s.createGroupAndGetMembers(i)
|
||||
s.Require().Equal(len(membersResp), i)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *TestSuite) createGroupAndGetMembers(numMembers int) []*group.GroupMember {
|
||||
addressPool := simtestutil.AddTestAddrsIncremental(s.bankKeeper, s.stakingKeeper, s.sdkCtx, numMembers, sdk.NewInt(30000000))
|
||||
members := make([]group.MemberRequest, numMembers)
|
||||
for i := 0; i < len(members); i++ {
|
||||
members[i] = group.MemberRequest{
|
||||
Address: addressPool[i].String(),
|
||||
Weight: "1",
|
||||
}
|
||||
}
|
||||
|
||||
g, err := s.groupKeeper.CreateGroup(s.ctx, &group.MsgCreateGroup{
|
||||
Admin: members[0].Address,
|
||||
Members: members,
|
||||
})
|
||||
s.Require().NoErrorf(err, "failed to create group with %d members", len(members))
|
||||
s.T().Logf("group %d created with %d members", g.GroupId, len(members))
|
||||
|
||||
groupMemberResp, err := s.groupKeeper.GroupMembers(s.ctx, &group.QueryGroupMembersRequest{GroupId: g.GroupId})
|
||||
s.Require().NoError(err)
|
||||
|
||||
s.T().Logf("got %d members from group %d", len(groupMemberResp.Members), g.GroupId)
|
||||
|
||||
return groupMemberResp.Members
|
||||
}
|
||||
|
||||
func (s *TestSuite) TestCreateGroup() {
|
||||
addrs := s.addrs
|
||||
addr1 := addrs[0]
|
||||
|
|
Loading…
Reference in New Issue