Merge pull request #115 from ava-labs/memory-improvements

Memory improvements
This commit is contained in:
Stephen Buttolph 2020-06-23 17:37:54 -04:00 committed by GitHub
commit 18c8f949a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 140 additions and 45 deletions

14
database/common.go Normal file
View File

@ -0,0 +1,14 @@
package database
const (
// MaxExcessCapacityFactor ...
// If, when a batch is reset, the cap(batch)/len(batch) > MaxExcessCapacityFactor,
// the underlying array's capacity will be reduced by a factor of capacityReductionFactor.
// Higher value for MaxExcessCapacityFactor --> less aggressive array downsizing --> less memory allocations
// but more unnecessary data in the underlying array that can't be garbage collected.
// Higher value for CapacityReductionFactor --> more aggressive array downsizing --> more memory allocations
// but less unnecessary data in the underlying array that can't be garbage collected.
MaxExcessCapacityFactor = 4
// CapacityReductionFactor ...
CapacityReductionFactor = 2
)

View File

@ -201,7 +201,11 @@ func (b *batch) Write() error {
// Reset resets the batch for reuse.
func (b *batch) Reset() {
b.writes = b.writes[:0]
if cap(b.writes) > len(b.writes)*database.MaxExcessCapacityFactor {
b.writes = make([]keyValue, 0, cap(b.writes)/database.CapacityReductionFactor)
} else {
b.writes = b.writes[:0]
}
b.Batch.Reset()
}

View File

@ -13,8 +13,10 @@ import (
"github.com/ava-labs/gecko/utils"
)
// DefaultSize is the default initial size of the memory database
const DefaultSize = 1 << 10
const (
// DefaultSize is the default initial size of the memory database
DefaultSize = 1 << 10
)
// Database is an ephemeral key-value store that implements the Database
// interface.
@ -191,7 +193,11 @@ func (b *batch) Write() error {
// Reset implements the Batch interface
func (b *batch) Reset() {
b.writes = b.writes[:0]
if cap(b.writes) > len(b.writes)*database.MaxExcessCapacityFactor {
b.writes = make([]keyValue, 0, cap(b.writes)/database.CapacityReductionFactor)
} else {
b.writes = b.writes[:0]
}
b.size = 0
}

View File

@ -199,7 +199,11 @@ func (b *batch) Write() error {
// Reset resets the batch for reuse.
func (b *batch) Reset() {
b.writes = b.writes[:0]
if cap(b.writes) > len(b.writes)*database.MaxExcessCapacityFactor {
b.writes = make([]keyValue, 0, cap(b.writes)/database.CapacityReductionFactor)
} else {
b.writes = b.writes[:0]
}
b.Batch.Reset()
}

View File

@ -180,7 +180,11 @@ func (b *batch) Write() error {
}
func (b *batch) Reset() {
b.writes = b.writes[:0]
if cap(b.writes) > len(b.writes)*database.MaxExcessCapacityFactor {
b.writes = make([]keyValue, 0, cap(b.writes)/database.CapacityReductionFactor)
} else {
b.writes = b.writes[:0]
}
b.size = 0
}

View File

@ -195,6 +195,7 @@ func (db *Database) Commit() error {
if err := batch.Write(); err != nil {
return err
}
batch.Reset()
db.abort()
return nil
}
@ -209,9 +210,10 @@ func (db *Database) Abort() {
func (db *Database) abort() { db.mem = make(map[string]valueDelete, memdb.DefaultSize) }
// CommitBatch returns a batch that will commit all pending writes to the
// underlying database. The returned batch should be written before future calls
// to this DB unless the batch will never be written.
// CommitBatch returns a batch that contains all uncommitted puts/deletes.
// Calling Write() on the returned batch causes the puts/deletes to be
// written to the underlying database. The returned batch should be written before
// future calls to this DB unless the batch will never be written.
func (db *Database) CommitBatch() (database.Batch, error) {
db.lock.Lock()
defer db.lock.Unlock()
@ -219,6 +221,8 @@ func (db *Database) CommitBatch() (database.Batch, error) {
return db.commitBatch()
}
// Put all of the puts/deletes in memory into db.batch
// and return the batch
func (db *Database) commitBatch() (database.Batch, error) {
if db.mem == nil {
return nil, database.ErrClosed
@ -234,9 +238,6 @@ func (db *Database) commitBatch() (database.Batch, error) {
return nil, err
}
}
if err := db.batch.Write(); err != nil {
return nil, err
}
return db.batch, nil
}
@ -249,6 +250,7 @@ func (db *Database) Close() error {
if db.mem == nil {
return database.ErrClosed
}
db.batch = nil
db.mem = nil
db.db = nil
return nil
@ -303,7 +305,11 @@ func (b *batch) Write() error {
// Reset implements the Database interface
func (b *batch) Reset() {
b.writes = b.writes[:0]
if cap(b.writes) > len(b.writes)*database.MaxExcessCapacityFactor {
b.writes = make([]keyValue, 0, cap(b.writes)/database.CapacityReductionFactor)
} else {
b.writes = b.writes[:0]
}
b.size = 0
}

View File

@ -299,6 +299,10 @@ func TestCommitBatch(t *testing.T) {
if err := db.Put(key1, value1); err != nil {
t.Fatalf("Unexpected error on db.Put: %s", err)
} else if has, err := baseDB.Has(key1); err != nil {
t.Fatalf("Unexpected error on db.Has: %s", err)
} else if has {
t.Fatalf("Unexpected result of db.Has: %v", has)
}
batch, err := db.CommitBatch()
@ -307,7 +311,11 @@ func TestCommitBatch(t *testing.T) {
}
db.Abort()
if err := batch.Write(); err != nil {
if has, err := db.Has(key1); err != nil {
t.Fatalf("Unexpected error on db.Has: %s", err)
} else if has {
t.Fatalf("Unexpected result of db.Has: %v", has)
} else if err := batch.Write(); err != nil {
t.Fatalf("Unexpected error on batch.Write: %s", err)
}

View File

@ -57,15 +57,23 @@ func (ids *ShortSet) Remove(idList ...ShortID) {
// Clear empties this set
func (ids *ShortSet) Clear() { *ids = nil }
// CappedList returns a list of length at most [size]. Size should be >= 0
// CappedList returns a list of length at most [size].
// Size should be >= 0. If size < 0, returns nil.
func (ids ShortSet) CappedList(size int) []ShortID {
idList := make([]ShortID, size)[:0]
if size < 0 {
return nil
}
if l := ids.Len(); l < size {
size = l
}
i := 0
idList := make([]ShortID, size)
for id := range ids {
if size <= 0 {
if i >= size {
break
}
size--
idList = append(idList, NewShortID(id))
idList[i] = NewShortID(id)
i++
}
return idList
}

View File

@ -10,6 +10,10 @@ import (
"github.com/ava-labs/gecko/snow/consensus/snowstorm"
)
const (
minMapSize = 16
)
// TopologicalFactory implements Factory by returning a topological struct
type TopologicalFactory struct{}
@ -65,12 +69,12 @@ func (ta *Topological) Initialize(ctx *snow.Context, params Parameters, frontier
ta.ctx.Log.Error("%s", err)
}
ta.nodes = make(map[[32]byte]Vertex)
ta.nodes = make(map[[32]byte]Vertex, minMapSize)
ta.cg = &snowstorm.Directed{}
ta.cg.Initialize(ctx, params.Parameters)
ta.frontier = make(map[[32]byte]Vertex)
ta.frontier = make(map[[32]byte]Vertex, minMapSize)
for _, vtx := range frontier {
ta.frontier[vtx.ID().Key()] = vtx
}
@ -159,7 +163,7 @@ func (ta *Topological) Finalized() bool { return ta.cg.Finalized() }
// the non-transitively applied votes. Also returns the list of leaf nodes.
func (ta *Topological) calculateInDegree(
responses ids.UniqueBag) (map[[32]byte]kahnNode, []ids.ID) {
kahns := make(map[[32]byte]kahnNode)
kahns := make(map[[32]byte]kahnNode, minMapSize)
leaves := ids.Set{}
for _, vote := range responses.List() {
@ -233,7 +237,7 @@ func (ta *Topological) pushVotes(
kahnNodes map[[32]byte]kahnNode,
leaves []ids.ID) ids.Bag {
votes := make(ids.UniqueBag)
txConflicts := make(map[[32]byte]ids.Set)
txConflicts := make(map[[32]byte]ids.Set, minMapSize)
for len(leaves) > 0 {
newLeavesSize := len(leaves) - 1
@ -443,9 +447,9 @@ func (ta *Topological) updateFrontiers() error {
ta.preferred.Clear()
ta.virtuous.Clear()
ta.orphans.Clear()
ta.frontier = make(map[[32]byte]Vertex)
ta.preferenceCache = make(map[[32]byte]bool)
ta.virtuousCache = make(map[[32]byte]bool)
ta.frontier = make(map[[32]byte]Vertex, minMapSize)
ta.preferenceCache = make(map[[32]byte]bool, minMapSize)
ta.virtuousCache = make(map[[32]byte]bool, minMapSize)
ta.orphans.Union(ta.cg.Virtuous()) // Initially, nothing is preferred

View File

@ -9,6 +9,10 @@ import (
"github.com/ava-labs/gecko/snow/consensus/snowball"
)
const (
minMapSize = 16
)
// TopologicalFactory implements Factory by returning a topological struct
type TopologicalFactory struct{}
@ -183,7 +187,7 @@ func (ts *Topological) Finalized() bool { return len(ts.blocks) == 1 }
// the non-transitively applied votes. Also returns the list of leaf blocks.
func (ts *Topological) calculateInDegree(
votes ids.Bag) (map[[32]byte]kahnNode, []ids.ID) {
kahns := make(map[[32]byte]kahnNode)
kahns := make(map[[32]byte]kahnNode, minMapSize)
leaves := ids.Set{}
for _, vote := range votes.List() {

View File

@ -13,6 +13,19 @@ import (
"github.com/ava-labs/gecko/utils/random"
)
const (
// maxExcessCapacityFactor ...
// If, when the validator set is reset, cap(set)/len(set) > MaxExcessCapacityFactor,
// the underlying arrays' capacities will be reduced by a factor of capacityReductionFactor.
// Higher value for maxExcessCapacityFactor --> less aggressive array downsizing --> less memory allocations
// but more unnecessary data in the underlying array that can't be garbage collected.
// Higher value for capacityReductionFactor --> more aggressive array downsizing --> more memory allocations
// but less unnecessary data in the underlying array that can't be garbage collected.
maxExcessCapacityFactor = 4
// CapacityReductionFactor ...
capacityReductionFactor = 2
)
// Set of validators that can be sampled
type Set interface {
fmt.Stringer
@ -71,9 +84,21 @@ func (s *set) Set(vdrs []Validator) {
}
func (s *set) set(vdrs []Validator) {
s.vdrMap = make(map[[20]byte]int, len(vdrs))
s.vdrSlice = s.vdrSlice[:0]
s.sampler.Weights = s.sampler.Weights[:0]
lenVdrs := len(vdrs)
// If the underlying arrays are much larger than necessary, resize them to
// allow garbage collection of unused memory
if cap(s.vdrSlice) > len(s.vdrSlice)*maxExcessCapacityFactor {
newCap := cap(s.vdrSlice) / capacityReductionFactor
if newCap < lenVdrs {
newCap = lenVdrs
}
s.vdrSlice = make([]Validator, 0, newCap)
s.sampler.Weights = make([]uint64, 0, newCap)
} else {
s.vdrSlice = s.vdrSlice[:0]
s.sampler.Weights = s.sampler.Weights[:0]
}
s.vdrMap = make(map[[20]byte]int, lenVdrs)
for _, vdr := range vdrs {
s.add(vdr)

View File

@ -9,6 +9,7 @@ import (
"github.com/ava-labs/gecko/chains/atomic"
"github.com/ava-labs/gecko/database/memdb"
"github.com/ava-labs/gecko/database/prefixdb"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow"
"github.com/ava-labs/gecko/snow/engine/common"
@ -117,9 +118,10 @@ func TestIssueExportTx(t *testing.T) {
genesisBytes := BuildGenesisTest(t)
issuer := make(chan common.Message, 1)
baseDB := memdb.New()
sm := &atomic.SharedMemory{}
sm.Initialize(logging.NoLog{}, memdb.New())
sm.Initialize(logging.NoLog{}, prefixdb.New([]byte{0}, baseDB))
ctx := snow.DefaultContextTest()
ctx.NetworkID = networkID
@ -138,7 +140,7 @@ func TestIssueExportTx(t *testing.T) {
}
err := vm.Initialize(
ctx,
memdb.New(),
prefixdb.New([]byte{1}, baseDB),
genesisBytes,
issuer,
[]*common.Fx{{
@ -273,9 +275,10 @@ func TestClearForceAcceptedExportTx(t *testing.T) {
genesisBytes := BuildGenesisTest(t)
issuer := make(chan common.Message, 1)
baseDB := memdb.New()
sm := &atomic.SharedMemory{}
sm.Initialize(logging.NoLog{}, memdb.New())
sm.Initialize(logging.NoLog{}, prefixdb.New([]byte{0}, baseDB))
ctx := snow.DefaultContextTest()
ctx.NetworkID = networkID
@ -294,7 +297,7 @@ func TestClearForceAcceptedExportTx(t *testing.T) {
}
err := vm.Initialize(
ctx,
memdb.New(),
prefixdb.New([]byte{1}, baseDB),
genesisBytes,
issuer,
[]*common.Fx{{

View File

@ -9,6 +9,7 @@ import (
"github.com/ava-labs/gecko/chains/atomic"
"github.com/ava-labs/gecko/database/memdb"
"github.com/ava-labs/gecko/database/prefixdb"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow"
"github.com/ava-labs/gecko/snow/engine/common"
@ -106,9 +107,10 @@ func TestIssueImportTx(t *testing.T) {
genesisBytes := BuildGenesisTest(t)
issuer := make(chan common.Message, 1)
baseDB := memdb.New()
sm := &atomic.SharedMemory{}
sm.Initialize(logging.NoLog{}, memdb.New())
sm.Initialize(logging.NoLog{}, prefixdb.New([]byte{0}, baseDB))
ctx := snow.DefaultContextTest()
ctx.NetworkID = networkID
@ -127,7 +129,7 @@ func TestIssueImportTx(t *testing.T) {
}
err := vm.Initialize(
ctx,
memdb.New(),
prefixdb.New([]byte{1}, baseDB),
genesisBytes,
issuer,
[]*common.Fx{{
@ -265,9 +267,10 @@ func TestForceAcceptImportTx(t *testing.T) {
genesisBytes := BuildGenesisTest(t)
issuer := make(chan common.Message, 1)
baseDB := memdb.New()
sm := &atomic.SharedMemory{}
sm.Initialize(logging.NoLog{}, memdb.New())
sm.Initialize(logging.NoLog{}, prefixdb.New([]byte{0}, baseDB))
ctx := snow.DefaultContextTest()
ctx.NetworkID = networkID
@ -285,7 +288,7 @@ func TestForceAcceptImportTx(t *testing.T) {
err := vm.Initialize(
ctx,
memdb.New(),
prefixdb.New([]byte{1}, baseDB),
genesisBytes,
issuer,
[]*common.Fx{{

View File

@ -492,10 +492,10 @@ func (vm *VM) parseTx(b []byte) (*UniqueTx, error) {
if err := vm.state.SetTx(tx.ID(), tx.Tx); err != nil {
return nil, err
}
if err := tx.setStatus(choices.Processing); err != nil {
return nil, err
}
return tx, vm.db.Commit()
}
return tx, nil

View File

@ -18,6 +18,7 @@ import (
"github.com/ava-labs/gecko/snow/consensus/snowman"
"github.com/ava-labs/gecko/snow/engine/common"
"github.com/ava-labs/gecko/snow/validators"
"github.com/ava-labs/gecko/utils/codec"
"github.com/ava-labs/gecko/utils/crypto"
"github.com/ava-labs/gecko/utils/formatting"
"github.com/ava-labs/gecko/utils/logging"
@ -26,7 +27,6 @@ import (
"github.com/ava-labs/gecko/utils/units"
"github.com/ava-labs/gecko/utils/wrappers"
"github.com/ava-labs/gecko/vms/components/ava"
"github.com/ava-labs/gecko/utils/codec"
"github.com/ava-labs/gecko/vms/components/core"
"github.com/ava-labs/gecko/vms/secp256k1fx"
)
@ -808,9 +808,11 @@ func (vm *VM) getValidators(validatorEvents *EventHeap) []validators.Validator {
validator.Wght = weight
}
vdrList := make([]validators.Validator, len(vdrMap))[:0]
vdrList := make([]validators.Validator, len(vdrMap))
i := 0
for _, validator := range vdrMap {
vdrList = append(vdrList, validator)
vdrList[i] = validator
i++
}
return vdrList
}

View File

@ -137,7 +137,7 @@ func defaultVM() *VM {
vm.validators.PutValidatorSet(DefaultSubnetID, defaultSubnet)
vm.clock.Set(defaultGenesisTime)
db := memdb.New()
db := prefixdb.New([]byte{0}, memdb.New())
msgChan := make(chan common.Message, 1)
ctx := defaultContext()
ctx.Lock.Lock()
@ -1189,7 +1189,7 @@ func TestAtomicImport(t *testing.T) {
key := keys[0]
sm := &atomic.SharedMemory{}
sm.Initialize(logging.NoLog{}, memdb.New())
sm.Initialize(logging.NoLog{}, prefixdb.New([]byte{0}, vm.DB.GetDatabase()))
vm.Ctx.SharedMemory = sm.NewBlockchainSharedMemory(vm.Ctx.ChainID)
@ -1282,7 +1282,7 @@ func TestOptimisticAtomicImport(t *testing.T) {
key := keys[0]
sm := &atomic.SharedMemory{}
sm.Initialize(logging.NoLog{}, memdb.New())
sm.Initialize(logging.NoLog{}, prefixdb.New([]byte{0}, vm.DB.GetDatabase()))
vm.Ctx.SharedMemory = sm.NewBlockchainSharedMemory(vm.Ctx.ChainID)