Added atomic shared memory

This commit is contained in:
StephenButtolph 2020-03-19 16:56:58 -04:00
parent 186d96ee40
commit bb93cc3eee
6 changed files with 318 additions and 0 deletions

View File

@ -0,0 +1,28 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package atomic
import (
"github.com/ava-labs/gecko/database"
"github.com/ava-labs/gecko/ids"
)
// BlockchainSharedMemory provides the API for a blockchain to interact with
// shared memory of another blockchain
type BlockchainSharedMemory struct {
blockchainID ids.ID
sm *SharedMemory
}
// GetDatabase returns and locks the provided DB
func (bsm *BlockchainSharedMemory) GetDatabase(id ids.ID) database.Database {
sharedID := bsm.sm.sharedID(id, bsm.blockchainID)
return bsm.sm.GetDatabase(sharedID)
}
// ReleaseDatabase unlocks the provided DB
func (bsm *BlockchainSharedMemory) ReleaseDatabase(id ids.ID) {
sharedID := bsm.sm.sharedID(id, bsm.blockchainID)
bsm.sm.ReleaseDatabase(sharedID)
}

View File

@ -0,0 +1,34 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package atomic
import (
"bytes"
"testing"
"github.com/ava-labs/gecko/database/memdb"
"github.com/ava-labs/gecko/utils/logging"
)
func TestBlockchainSharedMemory(t *testing.T) {
sm := SharedMemory{}
sm.Initialize(logging.NoLog{}, memdb.New())
bsm0 := sm.NewBlockchainSharedMemory(blockchainID0)
bsm1 := sm.NewBlockchainSharedMemory(blockchainID1)
sharedDB0 := bsm0.GetDatabase(blockchainID1)
if err := sharedDB0.Put([]byte{1}, []byte{2}); err != nil {
t.Fatal(err)
}
bsm0.ReleaseDatabase(blockchainID1)
sharedDB1 := bsm1.GetDatabase(blockchainID0)
if value, err := sharedDB1.Get([]byte{1}); err != nil {
t.Fatal(err)
} else if !bytes.Equal(value, []byte{2}) {
t.Fatalf("database.Get Returned: 0x%x ; Expected: 0x%x", value, []byte{2})
}
bsm1.ReleaseDatabase(blockchainID0)
}

105
chains/atomic/memory.go Normal file
View File

@ -0,0 +1,105 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package atomic
import (
"bytes"
"sync"
"github.com/ava-labs/gecko/database"
"github.com/ava-labs/gecko/database/prefixdb"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/utils/hashing"
"github.com/ava-labs/gecko/utils/logging"
"github.com/ava-labs/gecko/vms/components/codec"
)
type rcLock struct {
lock sync.Mutex
count int
}
// SharedMemory is the interface for shared memory inside a subnet
type SharedMemory struct {
lock sync.Mutex
log logging.Logger
codec codec.Codec
locks map[[32]byte]*rcLock
db database.Database
}
// Initialize the SharedMemory
func (sm *SharedMemory) Initialize(log logging.Logger, db database.Database) {
sm.log = log
sm.codec = codec.NewDefault()
sm.locks = make(map[[32]byte]*rcLock)
sm.db = db
}
// NewBlockchainSharedMemory returns a new BlockchainSharedMemory
func (sm *SharedMemory) NewBlockchainSharedMemory(id ids.ID) *BlockchainSharedMemory {
return &BlockchainSharedMemory{
blockchainID: id,
sm: sm,
}
}
// GetDatabase returns and locks the provided DB
func (sm *SharedMemory) GetDatabase(id ids.ID) database.Database {
lock := sm.makeLock(id)
lock.Lock()
return prefixdb.New(id.Bytes(), sm.db)
}
// ReleaseDatabase unlocks the provided DB
func (sm *SharedMemory) ReleaseDatabase(id ids.ID) {
lock := sm.releaseLock(id)
lock.Unlock()
}
func (sm *SharedMemory) makeLock(id ids.ID) *sync.Mutex {
sm.lock.Lock()
defer sm.lock.Unlock()
key := id.Key()
rc, exists := sm.locks[key]
if !exists {
rc = &rcLock{}
sm.locks[key] = rc
}
rc.count++
return &rc.lock
}
func (sm *SharedMemory) releaseLock(id ids.ID) *sync.Mutex {
sm.lock.Lock()
defer sm.lock.Unlock()
key := id.Key()
rc, exists := sm.locks[key]
if !exists {
panic("Attemping to free an unknown lock")
}
rc.count--
if rc.count == 0 {
delete(sm.locks, key)
}
return &rc.lock
}
// sharedID calculates the ID of the shared memory space
func (sm *SharedMemory) sharedID(id1, id2 ids.ID) ids.ID {
idKey1 := id1.Key()
idKey2 := id2.Key()
if bytes.Compare(idKey1[:], idKey2[:]) == 1 {
idKey1, idKey2 = idKey2, idKey1
}
combinedBytes, err := sm.codec.Marshal([2][32]byte{idKey1, idKey2})
sm.log.AssertNoError(err)
return ids.NewID(hashing.ComputeHash256Array(combinedBytes))
}

View File

@ -0,0 +1,69 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package atomic
import (
"testing"
"github.com/ava-labs/gecko/database/memdb"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/utils/logging"
)
var (
blockchainID0 = ids.Empty.Prefix(0)
blockchainID1 = ids.Empty.Prefix(1)
)
func TestSharedMemorySharedID(t *testing.T) {
sm := SharedMemory{}
sm.Initialize(logging.NoLog{}, memdb.New())
sharedID0 := sm.sharedID(blockchainID0, blockchainID1)
sharedID1 := sm.sharedID(blockchainID1, blockchainID0)
if !sharedID0.Equals(sharedID1) {
t.Fatalf("SharedMemory.sharedID should be communitive")
}
}
func TestSharedMemoryMakeReleaseLock(t *testing.T) {
sm := SharedMemory{}
sm.Initialize(logging.NoLog{}, memdb.New())
sharedID := sm.sharedID(blockchainID0, blockchainID1)
lock0 := sm.makeLock(sharedID)
if lock1 := sm.makeLock(sharedID); lock0 != lock1 {
t.Fatalf("SharedMemory.makeLock should have returned the same lock")
}
sm.releaseLock(sharedID)
if lock2 := sm.makeLock(sharedID); lock0 != lock2 {
t.Fatalf("SharedMemory.makeLock should have returned the same lock")
}
sm.releaseLock(sharedID)
sm.releaseLock(sharedID)
if lock3 := sm.makeLock(sharedID); lock0 == lock3 {
t.Fatalf("SharedMemory.releaseLock should have returned freed the lock")
}
sm.releaseLock(sharedID)
}
func TestSharedMemoryUnknownFree(t *testing.T) {
sm := SharedMemory{}
sm.Initialize(logging.NoLog{}, memdb.New())
sharedID := sm.sharedID(blockchainID0, blockchainID1)
defer func() {
if recover() == nil {
t.Fatalf("Should have panicked due to an unknown free")
}
}()
sm.releaseLock(sharedID)
}

21
chains/atomic/writer.go Normal file
View File

@ -0,0 +1,21 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package atomic
import (
"github.com/ava-labs/gecko/database"
)
// WriteAll assumes all batches have the same underlying database. Batches
// should not be modified after being passed to this function.
func WriteAll(baseBatch database.Batch, batches ...database.Batch) error {
baseBatch = baseBatch.Inner()
for _, batch := range batches {
batch = batch.Inner()
if err := batch.Replay(baseBatch); err != nil {
return err
}
}
return baseBatch.Write()
}

View File

@ -0,0 +1,61 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package atomic
import (
"bytes"
"testing"
"github.com/ava-labs/gecko/database/memdb"
"github.com/ava-labs/gecko/database/prefixdb"
"github.com/ava-labs/gecko/database/versiondb"
"github.com/ava-labs/gecko/utils/logging"
)
func TestWriteAll(t *testing.T) {
baseDB := memdb.New()
prefixedDBChain := prefixdb.New([]byte{0}, baseDB)
prefixedDBSharedMemory := prefixdb.New([]byte{1}, baseDB)
sm := SharedMemory{}
sm.Initialize(logging.NoLog{}, prefixedDBSharedMemory)
sharedID := sm.sharedID(blockchainID0, blockchainID1)
sharedDB := sm.GetDatabase(sharedID)
writeDB0 := versiondb.New(prefixedDBChain)
writeDB1 := versiondb.New(sharedDB)
defer sm.ReleaseDatabase(sharedID)
if err := writeDB0.Put([]byte{1}, []byte{2}); err != nil {
t.Fatal(err)
}
if err := writeDB1.Put([]byte{2}, []byte{3}); err != nil {
t.Fatal(err)
}
batch0, err := writeDB0.CommitBatch()
if err != nil {
t.Fatal(err)
}
batch1, err := writeDB1.CommitBatch()
if err != nil {
t.Fatal(err)
}
if err := WriteAll(batch0, batch1); err != nil {
t.Fatal(err)
}
if value, err := prefixedDBChain.Get([]byte{1}); err != nil {
t.Fatal(err)
} else if !bytes.Equal(value, []byte{2}) {
t.Fatalf("database.Get Returned: 0x%x ; Expected: 0x%x", value, []byte{2})
} else if value, err := sharedDB.Get([]byte{2}); err != nil {
t.Fatal(err)
} else if !bytes.Equal(value, []byte{3}) {
t.Fatalf("database.Get Returned: 0x%x ; Expected: 0x%x", value, []byte{3})
}
}