diff --git a/database/common.go b/database/common.go new file mode 100644 index 0000000..26b0531 --- /dev/null +++ b/database/common.go @@ -0,0 +1,14 @@ +package database + +const ( + // MaxExcessCapacityFactor ... + // If, when a batch is reset, the cap(batch)/len(batch) > MaxExcessCapacityFactor, + // the underlying array's capacity will be reduced by a factor of capacityReductionFactor. + // Higher value for MaxExcessCapacityFactor --> less aggressive array downsizing --> less memory allocations + // but more unnecessary data in the underlying array that can't be garbage collected. + // Higher value for CapacityReductionFactor --> more aggressive array downsizing --> more memory allocations + // but less unnecessary data in the underlying array that can't be garbage collected. + MaxExcessCapacityFactor = 4 + // CapacityReductionFactor ... + CapacityReductionFactor = 2 +) diff --git a/database/encdb/db.go b/database/encdb/db.go index fe33fa7..8f0d8e3 100644 --- a/database/encdb/db.go +++ b/database/encdb/db.go @@ -201,7 +201,11 @@ func (b *batch) Write() error { // Reset resets the batch for reuse. func (b *batch) Reset() { - b.writes = b.writes[:0] + if cap(b.writes) > len(b.writes)*database.MaxExcessCapacityFactor { + b.writes = make([]keyValue, 0, cap(b.writes)/database.CapacityReductionFactor) + } else { + b.writes = b.writes[:0] + } b.Batch.Reset() } diff --git a/database/memdb/db.go b/database/memdb/db.go index de0cae3..94ba395 100644 --- a/database/memdb/db.go +++ b/database/memdb/db.go @@ -13,8 +13,10 @@ import ( "github.com/ava-labs/gecko/utils" ) -// DefaultSize is the default initial size of the memory database -const DefaultSize = 1 << 10 +const ( + // DefaultSize is the default initial size of the memory database + DefaultSize = 1 << 10 +) // Database is an ephemeral key-value store that implements the Database // interface. @@ -191,7 +193,11 @@ func (b *batch) Write() error { // Reset implements the Batch interface func (b *batch) Reset() { - b.writes = b.writes[:0] + if cap(b.writes) > len(b.writes)*database.MaxExcessCapacityFactor { + b.writes = make([]keyValue, 0, cap(b.writes)/database.CapacityReductionFactor) + } else { + b.writes = b.writes[:0] + } b.size = 0 } diff --git a/database/prefixdb/db.go b/database/prefixdb/db.go index 34bc50d..7f606b2 100644 --- a/database/prefixdb/db.go +++ b/database/prefixdb/db.go @@ -199,7 +199,11 @@ func (b *batch) Write() error { // Reset resets the batch for reuse. func (b *batch) Reset() { - b.writes = b.writes[:0] + if cap(b.writes) > len(b.writes)*database.MaxExcessCapacityFactor { + b.writes = make([]keyValue, 0, cap(b.writes)/database.CapacityReductionFactor) + } else { + b.writes = b.writes[:0] + } b.Batch.Reset() } diff --git a/database/rpcdb/db_client.go b/database/rpcdb/db_client.go index dc3f60b..401e404 100644 --- a/database/rpcdb/db_client.go +++ b/database/rpcdb/db_client.go @@ -180,7 +180,11 @@ func (b *batch) Write() error { } func (b *batch) Reset() { - b.writes = b.writes[:0] + if cap(b.writes) > len(b.writes)*database.MaxExcessCapacityFactor { + b.writes = make([]keyValue, 0, cap(b.writes)/database.CapacityReductionFactor) + } else { + b.writes = b.writes[:0] + } b.size = 0 } diff --git a/database/versiondb/db.go b/database/versiondb/db.go index 7223c55..a1f9a18 100644 --- a/database/versiondb/db.go +++ b/database/versiondb/db.go @@ -195,6 +195,7 @@ func (db *Database) Commit() error { if err := batch.Write(); err != nil { return err } + batch.Reset() db.abort() return nil } @@ -209,9 +210,10 @@ func (db *Database) Abort() { func (db *Database) abort() { db.mem = make(map[string]valueDelete, memdb.DefaultSize) } -// CommitBatch returns a batch that will commit all pending writes to the -// underlying database. The returned batch should be written before future calls -// to this DB unless the batch will never be written. +// CommitBatch returns a batch that contains all uncommitted puts/deletes. +// Calling Write() on the returned batch causes the puts/deletes to be +// written to the underlying database. The returned batch should be written before +// future calls to this DB unless the batch will never be written. func (db *Database) CommitBatch() (database.Batch, error) { db.lock.Lock() defer db.lock.Unlock() @@ -219,6 +221,8 @@ func (db *Database) CommitBatch() (database.Batch, error) { return db.commitBatch() } +// Put all of the puts/deletes in memory into db.batch +// and return the batch func (db *Database) commitBatch() (database.Batch, error) { if db.mem == nil { return nil, database.ErrClosed @@ -234,9 +238,6 @@ func (db *Database) commitBatch() (database.Batch, error) { return nil, err } } - if err := db.batch.Write(); err != nil { - return nil, err - } return db.batch, nil } @@ -249,6 +250,7 @@ func (db *Database) Close() error { if db.mem == nil { return database.ErrClosed } + db.batch = nil db.mem = nil db.db = nil return nil @@ -303,7 +305,11 @@ func (b *batch) Write() error { // Reset implements the Database interface func (b *batch) Reset() { - b.writes = b.writes[:0] + if cap(b.writes) > len(b.writes)*database.MaxExcessCapacityFactor { + b.writes = make([]keyValue, 0, cap(b.writes)/database.CapacityReductionFactor) + } else { + b.writes = b.writes[:0] + } b.size = 0 } diff --git a/database/versiondb/db_test.go b/database/versiondb/db_test.go index 70cf8ff..345655a 100644 --- a/database/versiondb/db_test.go +++ b/database/versiondb/db_test.go @@ -299,6 +299,10 @@ func TestCommitBatch(t *testing.T) { if err := db.Put(key1, value1); err != nil { t.Fatalf("Unexpected error on db.Put: %s", err) + } else if has, err := baseDB.Has(key1); err != nil { + t.Fatalf("Unexpected error on db.Has: %s", err) + } else if has { + t.Fatalf("Unexpected result of db.Has: %v", has) } batch, err := db.CommitBatch() @@ -307,7 +311,11 @@ func TestCommitBatch(t *testing.T) { } db.Abort() - if err := batch.Write(); err != nil { + if has, err := db.Has(key1); err != nil { + t.Fatalf("Unexpected error on db.Has: %s", err) + } else if has { + t.Fatalf("Unexpected result of db.Has: %v", has) + } else if err := batch.Write(); err != nil { t.Fatalf("Unexpected error on batch.Write: %s", err) } diff --git a/ids/short_set.go b/ids/short_set.go index 6977863..9bcd37d 100644 --- a/ids/short_set.go +++ b/ids/short_set.go @@ -57,15 +57,23 @@ func (ids *ShortSet) Remove(idList ...ShortID) { // Clear empties this set func (ids *ShortSet) Clear() { *ids = nil } -// CappedList returns a list of length at most [size]. Size should be >= 0 +// CappedList returns a list of length at most [size]. +// Size should be >= 0. If size < 0, returns nil. func (ids ShortSet) CappedList(size int) []ShortID { - idList := make([]ShortID, size)[:0] + if size < 0 { + return nil + } + if l := ids.Len(); l < size { + size = l + } + i := 0 + idList := make([]ShortID, size) for id := range ids { - if size <= 0 { + if i >= size { break } - size-- - idList = append(idList, NewShortID(id)) + idList[i] = NewShortID(id) + i++ } return idList } diff --git a/snow/consensus/avalanche/topological.go b/snow/consensus/avalanche/topological.go index d786a0f..b8d128e 100644 --- a/snow/consensus/avalanche/topological.go +++ b/snow/consensus/avalanche/topological.go @@ -10,6 +10,10 @@ import ( "github.com/ava-labs/gecko/snow/consensus/snowstorm" ) +const ( + minMapSize = 16 +) + // TopologicalFactory implements Factory by returning a topological struct type TopologicalFactory struct{} @@ -65,12 +69,12 @@ func (ta *Topological) Initialize(ctx *snow.Context, params Parameters, frontier ta.ctx.Log.Error("%s", err) } - ta.nodes = make(map[[32]byte]Vertex) + ta.nodes = make(map[[32]byte]Vertex, minMapSize) ta.cg = &snowstorm.Directed{} ta.cg.Initialize(ctx, params.Parameters) - ta.frontier = make(map[[32]byte]Vertex) + ta.frontier = make(map[[32]byte]Vertex, minMapSize) for _, vtx := range frontier { ta.frontier[vtx.ID().Key()] = vtx } @@ -159,7 +163,7 @@ func (ta *Topological) Finalized() bool { return ta.cg.Finalized() } // the non-transitively applied votes. Also returns the list of leaf nodes. func (ta *Topological) calculateInDegree( responses ids.UniqueBag) (map[[32]byte]kahnNode, []ids.ID) { - kahns := make(map[[32]byte]kahnNode) + kahns := make(map[[32]byte]kahnNode, minMapSize) leaves := ids.Set{} for _, vote := range responses.List() { @@ -233,7 +237,7 @@ func (ta *Topological) pushVotes( kahnNodes map[[32]byte]kahnNode, leaves []ids.ID) ids.Bag { votes := make(ids.UniqueBag) - txConflicts := make(map[[32]byte]ids.Set) + txConflicts := make(map[[32]byte]ids.Set, minMapSize) for len(leaves) > 0 { newLeavesSize := len(leaves) - 1 @@ -443,9 +447,9 @@ func (ta *Topological) updateFrontiers() error { ta.preferred.Clear() ta.virtuous.Clear() ta.orphans.Clear() - ta.frontier = make(map[[32]byte]Vertex) - ta.preferenceCache = make(map[[32]byte]bool) - ta.virtuousCache = make(map[[32]byte]bool) + ta.frontier = make(map[[32]byte]Vertex, minMapSize) + ta.preferenceCache = make(map[[32]byte]bool, minMapSize) + ta.virtuousCache = make(map[[32]byte]bool, minMapSize) ta.orphans.Union(ta.cg.Virtuous()) // Initially, nothing is preferred diff --git a/snow/consensus/snowman/topological.go b/snow/consensus/snowman/topological.go index 6f98751..51612db 100644 --- a/snow/consensus/snowman/topological.go +++ b/snow/consensus/snowman/topological.go @@ -9,6 +9,10 @@ import ( "github.com/ava-labs/gecko/snow/consensus/snowball" ) +const ( + minMapSize = 16 +) + // TopologicalFactory implements Factory by returning a topological struct type TopologicalFactory struct{} @@ -183,7 +187,7 @@ func (ts *Topological) Finalized() bool { return len(ts.blocks) == 1 } // the non-transitively applied votes. Also returns the list of leaf blocks. func (ts *Topological) calculateInDegree( votes ids.Bag) (map[[32]byte]kahnNode, []ids.ID) { - kahns := make(map[[32]byte]kahnNode) + kahns := make(map[[32]byte]kahnNode, minMapSize) leaves := ids.Set{} for _, vote := range votes.List() { diff --git a/snow/validators/set.go b/snow/validators/set.go index 50210bf..610a85f 100644 --- a/snow/validators/set.go +++ b/snow/validators/set.go @@ -13,6 +13,19 @@ import ( "github.com/ava-labs/gecko/utils/random" ) +const ( + // maxExcessCapacityFactor ... + // If, when the validator set is reset, cap(set)/len(set) > MaxExcessCapacityFactor, + // the underlying arrays' capacities will be reduced by a factor of capacityReductionFactor. + // Higher value for maxExcessCapacityFactor --> less aggressive array downsizing --> less memory allocations + // but more unnecessary data in the underlying array that can't be garbage collected. + // Higher value for capacityReductionFactor --> more aggressive array downsizing --> more memory allocations + // but less unnecessary data in the underlying array that can't be garbage collected. + maxExcessCapacityFactor = 4 + // CapacityReductionFactor ... + capacityReductionFactor = 2 +) + // Set of validators that can be sampled type Set interface { fmt.Stringer @@ -71,9 +84,21 @@ func (s *set) Set(vdrs []Validator) { } func (s *set) set(vdrs []Validator) { - s.vdrMap = make(map[[20]byte]int, len(vdrs)) - s.vdrSlice = s.vdrSlice[:0] - s.sampler.Weights = s.sampler.Weights[:0] + lenVdrs := len(vdrs) + // If the underlying arrays are much larger than necessary, resize them to + // allow garbage collection of unused memory + if cap(s.vdrSlice) > len(s.vdrSlice)*maxExcessCapacityFactor { + newCap := cap(s.vdrSlice) / capacityReductionFactor + if newCap < lenVdrs { + newCap = lenVdrs + } + s.vdrSlice = make([]Validator, 0, newCap) + s.sampler.Weights = make([]uint64, 0, newCap) + } else { + s.vdrSlice = s.vdrSlice[:0] + s.sampler.Weights = s.sampler.Weights[:0] + } + s.vdrMap = make(map[[20]byte]int, lenVdrs) for _, vdr := range vdrs { s.add(vdr) diff --git a/vms/avm/export_tx_test.go b/vms/avm/export_tx_test.go index 75b359f..96c7733 100644 --- a/vms/avm/export_tx_test.go +++ b/vms/avm/export_tx_test.go @@ -9,6 +9,7 @@ import ( "github.com/ava-labs/gecko/chains/atomic" "github.com/ava-labs/gecko/database/memdb" + "github.com/ava-labs/gecko/database/prefixdb" "github.com/ava-labs/gecko/ids" "github.com/ava-labs/gecko/snow" "github.com/ava-labs/gecko/snow/engine/common" @@ -117,9 +118,10 @@ func TestIssueExportTx(t *testing.T) { genesisBytes := BuildGenesisTest(t) issuer := make(chan common.Message, 1) + baseDB := memdb.New() sm := &atomic.SharedMemory{} - sm.Initialize(logging.NoLog{}, memdb.New()) + sm.Initialize(logging.NoLog{}, prefixdb.New([]byte{0}, baseDB)) ctx := snow.DefaultContextTest() ctx.NetworkID = networkID @@ -138,7 +140,7 @@ func TestIssueExportTx(t *testing.T) { } err := vm.Initialize( ctx, - memdb.New(), + prefixdb.New([]byte{1}, baseDB), genesisBytes, issuer, []*common.Fx{{ @@ -273,9 +275,10 @@ func TestClearForceAcceptedExportTx(t *testing.T) { genesisBytes := BuildGenesisTest(t) issuer := make(chan common.Message, 1) + baseDB := memdb.New() sm := &atomic.SharedMemory{} - sm.Initialize(logging.NoLog{}, memdb.New()) + sm.Initialize(logging.NoLog{}, prefixdb.New([]byte{0}, baseDB)) ctx := snow.DefaultContextTest() ctx.NetworkID = networkID @@ -294,7 +297,7 @@ func TestClearForceAcceptedExportTx(t *testing.T) { } err := vm.Initialize( ctx, - memdb.New(), + prefixdb.New([]byte{1}, baseDB), genesisBytes, issuer, []*common.Fx{{ diff --git a/vms/avm/import_tx_test.go b/vms/avm/import_tx_test.go index e510aff..750d402 100644 --- a/vms/avm/import_tx_test.go +++ b/vms/avm/import_tx_test.go @@ -9,6 +9,7 @@ import ( "github.com/ava-labs/gecko/chains/atomic" "github.com/ava-labs/gecko/database/memdb" + "github.com/ava-labs/gecko/database/prefixdb" "github.com/ava-labs/gecko/ids" "github.com/ava-labs/gecko/snow" "github.com/ava-labs/gecko/snow/engine/common" @@ -106,9 +107,10 @@ func TestIssueImportTx(t *testing.T) { genesisBytes := BuildGenesisTest(t) issuer := make(chan common.Message, 1) + baseDB := memdb.New() sm := &atomic.SharedMemory{} - sm.Initialize(logging.NoLog{}, memdb.New()) + sm.Initialize(logging.NoLog{}, prefixdb.New([]byte{0}, baseDB)) ctx := snow.DefaultContextTest() ctx.NetworkID = networkID @@ -127,7 +129,7 @@ func TestIssueImportTx(t *testing.T) { } err := vm.Initialize( ctx, - memdb.New(), + prefixdb.New([]byte{1}, baseDB), genesisBytes, issuer, []*common.Fx{{ @@ -265,9 +267,10 @@ func TestForceAcceptImportTx(t *testing.T) { genesisBytes := BuildGenesisTest(t) issuer := make(chan common.Message, 1) + baseDB := memdb.New() sm := &atomic.SharedMemory{} - sm.Initialize(logging.NoLog{}, memdb.New()) + sm.Initialize(logging.NoLog{}, prefixdb.New([]byte{0}, baseDB)) ctx := snow.DefaultContextTest() ctx.NetworkID = networkID @@ -285,7 +288,7 @@ func TestForceAcceptImportTx(t *testing.T) { err := vm.Initialize( ctx, - memdb.New(), + prefixdb.New([]byte{1}, baseDB), genesisBytes, issuer, []*common.Fx{{ diff --git a/vms/avm/vm.go b/vms/avm/vm.go index 715ce95..026760c 100644 --- a/vms/avm/vm.go +++ b/vms/avm/vm.go @@ -492,10 +492,10 @@ func (vm *VM) parseTx(b []byte) (*UniqueTx, error) { if err := vm.state.SetTx(tx.ID(), tx.Tx); err != nil { return nil, err } - if err := tx.setStatus(choices.Processing); err != nil { return nil, err } + return tx, vm.db.Commit() } return tx, nil diff --git a/vms/platformvm/vm.go b/vms/platformvm/vm.go index baff040..8b9350f 100644 --- a/vms/platformvm/vm.go +++ b/vms/platformvm/vm.go @@ -18,6 +18,7 @@ import ( "github.com/ava-labs/gecko/snow/consensus/snowman" "github.com/ava-labs/gecko/snow/engine/common" "github.com/ava-labs/gecko/snow/validators" + "github.com/ava-labs/gecko/utils/codec" "github.com/ava-labs/gecko/utils/crypto" "github.com/ava-labs/gecko/utils/formatting" "github.com/ava-labs/gecko/utils/logging" @@ -26,7 +27,6 @@ import ( "github.com/ava-labs/gecko/utils/units" "github.com/ava-labs/gecko/utils/wrappers" "github.com/ava-labs/gecko/vms/components/ava" - "github.com/ava-labs/gecko/utils/codec" "github.com/ava-labs/gecko/vms/components/core" "github.com/ava-labs/gecko/vms/secp256k1fx" ) @@ -808,9 +808,11 @@ func (vm *VM) getValidators(validatorEvents *EventHeap) []validators.Validator { validator.Wght = weight } - vdrList := make([]validators.Validator, len(vdrMap))[:0] + vdrList := make([]validators.Validator, len(vdrMap)) + i := 0 for _, validator := range vdrMap { - vdrList = append(vdrList, validator) + vdrList[i] = validator + i++ } return vdrList } diff --git a/vms/platformvm/vm_test.go b/vms/platformvm/vm_test.go index dcee89a..82989eb 100644 --- a/vms/platformvm/vm_test.go +++ b/vms/platformvm/vm_test.go @@ -137,7 +137,7 @@ func defaultVM() *VM { vm.validators.PutValidatorSet(DefaultSubnetID, defaultSubnet) vm.clock.Set(defaultGenesisTime) - db := memdb.New() + db := prefixdb.New([]byte{0}, memdb.New()) msgChan := make(chan common.Message, 1) ctx := defaultContext() ctx.Lock.Lock() @@ -1189,7 +1189,7 @@ func TestAtomicImport(t *testing.T) { key := keys[0] sm := &atomic.SharedMemory{} - sm.Initialize(logging.NoLog{}, memdb.New()) + sm.Initialize(logging.NoLog{}, prefixdb.New([]byte{0}, vm.DB.GetDatabase())) vm.Ctx.SharedMemory = sm.NewBlockchainSharedMemory(vm.Ctx.ChainID) @@ -1282,7 +1282,7 @@ func TestOptimisticAtomicImport(t *testing.T) { key := keys[0] sm := &atomic.SharedMemory{} - sm.Initialize(logging.NoLog{}, memdb.New()) + sm.Initialize(logging.NoLog{}, prefixdb.New([]byte{0}, vm.DB.GetDatabase())) vm.Ctx.SharedMemory = sm.NewBlockchainSharedMemory(vm.Ctx.ChainID)