From bb93cc3eeebc5ebd0440b465624aecf84ade428f Mon Sep 17 00:00:00 2001 From: StephenButtolph Date: Thu, 19 Mar 2020 16:56:58 -0400 Subject: [PATCH] Added atomic shared memory --- chains/atomic/blockchain_memory.go | 28 +++++++ chains/atomic/blockchain_memory_test.go | 34 ++++++++ chains/atomic/memory.go | 105 ++++++++++++++++++++++++ chains/atomic/memory_test.go | 69 ++++++++++++++++ chains/atomic/writer.go | 21 +++++ chains/atomic/writer_test.go | 61 ++++++++++++++ 6 files changed, 318 insertions(+) create mode 100644 chains/atomic/blockchain_memory.go create mode 100644 chains/atomic/blockchain_memory_test.go create mode 100644 chains/atomic/memory.go create mode 100644 chains/atomic/memory_test.go create mode 100644 chains/atomic/writer.go create mode 100644 chains/atomic/writer_test.go diff --git a/chains/atomic/blockchain_memory.go b/chains/atomic/blockchain_memory.go new file mode 100644 index 0000000..a02a85a --- /dev/null +++ b/chains/atomic/blockchain_memory.go @@ -0,0 +1,28 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package atomic + +import ( + "github.com/ava-labs/gecko/database" + "github.com/ava-labs/gecko/ids" +) + +// BlockchainSharedMemory provides the API for a blockchain to interact with +// shared memory of another blockchain +type BlockchainSharedMemory struct { + blockchainID ids.ID + sm *SharedMemory +} + +// GetDatabase returns and locks the provided DB +func (bsm *BlockchainSharedMemory) GetDatabase(id ids.ID) database.Database { + sharedID := bsm.sm.sharedID(id, bsm.blockchainID) + return bsm.sm.GetDatabase(sharedID) +} + +// ReleaseDatabase unlocks the provided DB +func (bsm *BlockchainSharedMemory) ReleaseDatabase(id ids.ID) { + sharedID := bsm.sm.sharedID(id, bsm.blockchainID) + bsm.sm.ReleaseDatabase(sharedID) +} diff --git a/chains/atomic/blockchain_memory_test.go b/chains/atomic/blockchain_memory_test.go new file mode 100644 index 0000000..318ae0d --- /dev/null +++ b/chains/atomic/blockchain_memory_test.go @@ -0,0 +1,34 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package atomic + +import ( + "bytes" + "testing" + + "github.com/ava-labs/gecko/database/memdb" + "github.com/ava-labs/gecko/utils/logging" +) + +func TestBlockchainSharedMemory(t *testing.T) { + sm := SharedMemory{} + sm.Initialize(logging.NoLog{}, memdb.New()) + + bsm0 := sm.NewBlockchainSharedMemory(blockchainID0) + bsm1 := sm.NewBlockchainSharedMemory(blockchainID1) + + sharedDB0 := bsm0.GetDatabase(blockchainID1) + if err := sharedDB0.Put([]byte{1}, []byte{2}); err != nil { + t.Fatal(err) + } + bsm0.ReleaseDatabase(blockchainID1) + + sharedDB1 := bsm1.GetDatabase(blockchainID0) + if value, err := sharedDB1.Get([]byte{1}); err != nil { + t.Fatal(err) + } else if !bytes.Equal(value, []byte{2}) { + t.Fatalf("database.Get Returned: 0x%x ; Expected: 0x%x", value, []byte{2}) + } + bsm1.ReleaseDatabase(blockchainID0) +} diff --git a/chains/atomic/memory.go b/chains/atomic/memory.go new file mode 100644 index 0000000..448e6c9 --- /dev/null +++ b/chains/atomic/memory.go @@ -0,0 +1,105 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package atomic + +import ( + "bytes" + "sync" + + "github.com/ava-labs/gecko/database" + "github.com/ava-labs/gecko/database/prefixdb" + "github.com/ava-labs/gecko/ids" + "github.com/ava-labs/gecko/utils/hashing" + "github.com/ava-labs/gecko/utils/logging" + "github.com/ava-labs/gecko/vms/components/codec" +) + +type rcLock struct { + lock sync.Mutex + count int +} + +// SharedMemory is the interface for shared memory inside a subnet +type SharedMemory struct { + lock sync.Mutex + log logging.Logger + codec codec.Codec + locks map[[32]byte]*rcLock + db database.Database +} + +// Initialize the SharedMemory +func (sm *SharedMemory) Initialize(log logging.Logger, db database.Database) { + sm.log = log + sm.codec = codec.NewDefault() + sm.locks = make(map[[32]byte]*rcLock) + sm.db = db +} + +// NewBlockchainSharedMemory returns a new BlockchainSharedMemory +func (sm *SharedMemory) NewBlockchainSharedMemory(id ids.ID) *BlockchainSharedMemory { + return &BlockchainSharedMemory{ + blockchainID: id, + sm: sm, + } +} + +// GetDatabase returns and locks the provided DB +func (sm *SharedMemory) GetDatabase(id ids.ID) database.Database { + lock := sm.makeLock(id) + lock.Lock() + + return prefixdb.New(id.Bytes(), sm.db) +} + +// ReleaseDatabase unlocks the provided DB +func (sm *SharedMemory) ReleaseDatabase(id ids.ID) { + lock := sm.releaseLock(id) + lock.Unlock() +} + +func (sm *SharedMemory) makeLock(id ids.ID) *sync.Mutex { + sm.lock.Lock() + defer sm.lock.Unlock() + + key := id.Key() + rc, exists := sm.locks[key] + if !exists { + rc = &rcLock{} + sm.locks[key] = rc + } + rc.count++ + return &rc.lock +} + +func (sm *SharedMemory) releaseLock(id ids.ID) *sync.Mutex { + sm.lock.Lock() + defer sm.lock.Unlock() + + key := id.Key() + rc, exists := sm.locks[key] + if !exists { + panic("Attemping to free an unknown lock") + } + rc.count-- + if rc.count == 0 { + delete(sm.locks, key) + } + return &rc.lock +} + +// sharedID calculates the ID of the shared memory space +func (sm *SharedMemory) sharedID(id1, id2 ids.ID) ids.ID { + idKey1 := id1.Key() + idKey2 := id2.Key() + + if bytes.Compare(idKey1[:], idKey2[:]) == 1 { + idKey1, idKey2 = idKey2, idKey1 + } + + combinedBytes, err := sm.codec.Marshal([2][32]byte{idKey1, idKey2}) + sm.log.AssertNoError(err) + + return ids.NewID(hashing.ComputeHash256Array(combinedBytes)) +} diff --git a/chains/atomic/memory_test.go b/chains/atomic/memory_test.go new file mode 100644 index 0000000..f1cf020 --- /dev/null +++ b/chains/atomic/memory_test.go @@ -0,0 +1,69 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package atomic + +import ( + "testing" + + "github.com/ava-labs/gecko/database/memdb" + "github.com/ava-labs/gecko/ids" + "github.com/ava-labs/gecko/utils/logging" +) + +var ( + blockchainID0 = ids.Empty.Prefix(0) + blockchainID1 = ids.Empty.Prefix(1) +) + +func TestSharedMemorySharedID(t *testing.T) { + sm := SharedMemory{} + sm.Initialize(logging.NoLog{}, memdb.New()) + + sharedID0 := sm.sharedID(blockchainID0, blockchainID1) + sharedID1 := sm.sharedID(blockchainID1, blockchainID0) + + if !sharedID0.Equals(sharedID1) { + t.Fatalf("SharedMemory.sharedID should be communitive") + } +} + +func TestSharedMemoryMakeReleaseLock(t *testing.T) { + sm := SharedMemory{} + sm.Initialize(logging.NoLog{}, memdb.New()) + + sharedID := sm.sharedID(blockchainID0, blockchainID1) + + lock0 := sm.makeLock(sharedID) + + if lock1 := sm.makeLock(sharedID); lock0 != lock1 { + t.Fatalf("SharedMemory.makeLock should have returned the same lock") + } + sm.releaseLock(sharedID) + + if lock2 := sm.makeLock(sharedID); lock0 != lock2 { + t.Fatalf("SharedMemory.makeLock should have returned the same lock") + } + sm.releaseLock(sharedID) + sm.releaseLock(sharedID) + + if lock3 := sm.makeLock(sharedID); lock0 == lock3 { + t.Fatalf("SharedMemory.releaseLock should have returned freed the lock") + } + sm.releaseLock(sharedID) +} + +func TestSharedMemoryUnknownFree(t *testing.T) { + sm := SharedMemory{} + sm.Initialize(logging.NoLog{}, memdb.New()) + + sharedID := sm.sharedID(blockchainID0, blockchainID1) + + defer func() { + if recover() == nil { + t.Fatalf("Should have panicked due to an unknown free") + } + }() + + sm.releaseLock(sharedID) +} diff --git a/chains/atomic/writer.go b/chains/atomic/writer.go new file mode 100644 index 0000000..bacabab --- /dev/null +++ b/chains/atomic/writer.go @@ -0,0 +1,21 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package atomic + +import ( + "github.com/ava-labs/gecko/database" +) + +// WriteAll assumes all batches have the same underlying database. Batches +// should not be modified after being passed to this function. +func WriteAll(baseBatch database.Batch, batches ...database.Batch) error { + baseBatch = baseBatch.Inner() + for _, batch := range batches { + batch = batch.Inner() + if err := batch.Replay(baseBatch); err != nil { + return err + } + } + return baseBatch.Write() +} diff --git a/chains/atomic/writer_test.go b/chains/atomic/writer_test.go new file mode 100644 index 0000000..8c79519 --- /dev/null +++ b/chains/atomic/writer_test.go @@ -0,0 +1,61 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package atomic + +import ( + "bytes" + "testing" + + "github.com/ava-labs/gecko/database/memdb" + "github.com/ava-labs/gecko/database/prefixdb" + "github.com/ava-labs/gecko/database/versiondb" + "github.com/ava-labs/gecko/utils/logging" +) + +func TestWriteAll(t *testing.T) { + baseDB := memdb.New() + prefixedDBChain := prefixdb.New([]byte{0}, baseDB) + prefixedDBSharedMemory := prefixdb.New([]byte{1}, baseDB) + + sm := SharedMemory{} + sm.Initialize(logging.NoLog{}, prefixedDBSharedMemory) + + sharedID := sm.sharedID(blockchainID0, blockchainID1) + + sharedDB := sm.GetDatabase(sharedID) + + writeDB0 := versiondb.New(prefixedDBChain) + writeDB1 := versiondb.New(sharedDB) + defer sm.ReleaseDatabase(sharedID) + + if err := writeDB0.Put([]byte{1}, []byte{2}); err != nil { + t.Fatal(err) + } + if err := writeDB1.Put([]byte{2}, []byte{3}); err != nil { + t.Fatal(err) + } + + batch0, err := writeDB0.CommitBatch() + if err != nil { + t.Fatal(err) + } + batch1, err := writeDB1.CommitBatch() + if err != nil { + t.Fatal(err) + } + + if err := WriteAll(batch0, batch1); err != nil { + t.Fatal(err) + } + + if value, err := prefixedDBChain.Get([]byte{1}); err != nil { + t.Fatal(err) + } else if !bytes.Equal(value, []byte{2}) { + t.Fatalf("database.Get Returned: 0x%x ; Expected: 0x%x", value, []byte{2}) + } else if value, err := sharedDB.Get([]byte{2}); err != nil { + t.Fatal(err) + } else if !bytes.Equal(value, []byte{3}) { + t.Fatalf("database.Get Returned: 0x%x ; Expected: 0x%x", value, []byte{3}) + } +}