mirror of https://github.com/poanetwork/gecko.git
529 lines
13 KiB
Go
529 lines
13 KiB
Go
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
|
|
// See the file LICENSE for licensing terms.
|
|
|
|
package avm
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"reflect"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/gorilla/rpc/v2"
|
|
|
|
"github.com/ava-labs/gecko/cache"
|
|
"github.com/ava-labs/gecko/database"
|
|
"github.com/ava-labs/gecko/database/versiondb"
|
|
"github.com/ava-labs/gecko/ids"
|
|
"github.com/ava-labs/gecko/snow"
|
|
"github.com/ava-labs/gecko/snow/choices"
|
|
"github.com/ava-labs/gecko/snow/consensus/snowstorm"
|
|
"github.com/ava-labs/gecko/snow/engine/common"
|
|
"github.com/ava-labs/gecko/utils/formatting"
|
|
"github.com/ava-labs/gecko/utils/timer"
|
|
"github.com/ava-labs/gecko/utils/wrappers"
|
|
"github.com/ava-labs/gecko/vms/components/ava"
|
|
"github.com/ava-labs/gecko/vms/components/codec"
|
|
|
|
cjson "github.com/ava-labs/gecko/utils/json"
|
|
)
|
|
|
|
const (
|
|
batchTimeout = time.Second
|
|
batchSize = 30
|
|
stateCacheSize = 10000
|
|
idCacheSize = 10000
|
|
txCacheSize = 10000
|
|
addressSep = "-"
|
|
)
|
|
|
|
var (
|
|
errIncompatibleFx = errors.New("incompatible feature extension")
|
|
errUnknownFx = errors.New("unknown feature extension")
|
|
errGenesisAssetMustHaveState = errors.New("genesis asset must have non-empty state")
|
|
errInvalidAddress = errors.New("invalid address")
|
|
errWrongBlockchainID = errors.New("wrong blockchain ID")
|
|
)
|
|
|
|
// VM implements the avalanche.DAGVM interface
|
|
type VM struct {
|
|
ids.Aliaser
|
|
|
|
// Contains information of where this VM is executing
|
|
ctx *snow.Context
|
|
|
|
// Used to check local time
|
|
clock timer.Clock
|
|
|
|
codec codec.Codec
|
|
|
|
pubsub *cjson.PubSubServer
|
|
|
|
// State management
|
|
state *prefixedState
|
|
|
|
// Transaction issuing
|
|
timer *timer.Timer
|
|
batchTimeout time.Duration
|
|
txs []snowstorm.Tx
|
|
toEngine chan<- common.Message
|
|
|
|
baseDB database.Database
|
|
db *versiondb.Database
|
|
|
|
typeToFxIndex map[reflect.Type]int
|
|
fxs []*parsedFx
|
|
}
|
|
|
|
type codecRegistry struct {
|
|
index int
|
|
typeToFxIndex map[reflect.Type]int
|
|
codec codec.Codec
|
|
}
|
|
|
|
func (cr *codecRegistry) RegisterType(val interface{}) error {
|
|
valType := reflect.TypeOf(val)
|
|
cr.typeToFxIndex[valType] = cr.index
|
|
return cr.codec.RegisterType(val)
|
|
}
|
|
func (cr *codecRegistry) Marshal(val interface{}) ([]byte, error) { return cr.codec.Marshal(val) }
|
|
func (cr *codecRegistry) Unmarshal(b []byte, val interface{}) error { return cr.codec.Unmarshal(b, val) }
|
|
|
|
/*
|
|
******************************************************************************
|
|
******************************** Avalanche API *******************************
|
|
******************************************************************************
|
|
*/
|
|
|
|
// Initialize implements the avalanche.DAGVM interface
|
|
func (vm *VM) Initialize(
|
|
ctx *snow.Context,
|
|
db database.Database,
|
|
genesisBytes []byte,
|
|
toEngine chan<- common.Message,
|
|
fxs []*common.Fx,
|
|
) error {
|
|
vm.ctx = ctx
|
|
vm.toEngine = toEngine
|
|
vm.baseDB = db
|
|
vm.db = versiondb.New(db)
|
|
vm.typeToFxIndex = map[reflect.Type]int{}
|
|
vm.Aliaser.Initialize()
|
|
|
|
vm.pubsub = cjson.NewPubSubServer(ctx)
|
|
|
|
errs := wrappers.Errs{}
|
|
errs.Add(
|
|
vm.pubsub.Register("accepted"),
|
|
vm.pubsub.Register("rejected"),
|
|
vm.pubsub.Register("verified"),
|
|
)
|
|
if errs.Errored() {
|
|
return errs.Err
|
|
}
|
|
|
|
c := codec.NewDefault()
|
|
c.RegisterType(&BaseTx{})
|
|
c.RegisterType(&CreateAssetTx{})
|
|
c.RegisterType(&OperationTx{})
|
|
c.RegisterType(&ImportTx{})
|
|
c.RegisterType(&ExportTx{})
|
|
|
|
vm.fxs = make([]*parsedFx, len(fxs))
|
|
for i, fxContainer := range fxs {
|
|
if fxContainer == nil {
|
|
return errIncompatibleFx
|
|
}
|
|
fx, ok := fxContainer.Fx.(Fx)
|
|
if !ok {
|
|
return errIncompatibleFx
|
|
}
|
|
vm.fxs[i] = &parsedFx{
|
|
ID: fxContainer.ID,
|
|
Fx: fx,
|
|
}
|
|
vm.codec = &codecRegistry{
|
|
index: i,
|
|
typeToFxIndex: vm.typeToFxIndex,
|
|
codec: c,
|
|
}
|
|
if err := fx.Initialize(vm); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
vm.codec = c
|
|
|
|
vm.state = &prefixedState{
|
|
state: &state{State: ava.State{
|
|
Cache: &cache.LRU{Size: stateCacheSize},
|
|
DB: vm.db,
|
|
Codec: vm.codec,
|
|
}},
|
|
|
|
tx: &cache.LRU{Size: idCacheSize},
|
|
utxo: &cache.LRU{Size: idCacheSize},
|
|
txStatus: &cache.LRU{Size: idCacheSize},
|
|
funds: &cache.LRU{Size: idCacheSize},
|
|
|
|
uniqueTx: &cache.EvictableLRU{Size: txCacheSize},
|
|
}
|
|
|
|
if err := vm.initAliases(genesisBytes); err != nil {
|
|
return err
|
|
}
|
|
|
|
if dbStatus, err := vm.state.DBInitialized(); err != nil || dbStatus == choices.Unknown {
|
|
if err := vm.initState(genesisBytes); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
vm.timer = timer.NewTimer(func() {
|
|
ctx.Lock.Lock()
|
|
defer ctx.Lock.Unlock()
|
|
|
|
vm.FlushTxs()
|
|
})
|
|
go ctx.Log.RecoverAndPanic(vm.timer.Dispatch)
|
|
vm.batchTimeout = batchTimeout
|
|
|
|
return vm.db.Commit()
|
|
}
|
|
|
|
// Shutdown implements the avalanche.DAGVM interface
|
|
func (vm *VM) Shutdown() {
|
|
vm.timer.Stop()
|
|
if err := vm.baseDB.Close(); err != nil {
|
|
vm.ctx.Log.Error("Closing the database failed with %s", err)
|
|
}
|
|
}
|
|
|
|
// CreateHandlers implements the avalanche.DAGVM interface
|
|
func (vm *VM) CreateHandlers() map[string]*common.HTTPHandler {
|
|
rpcServer := rpc.NewServer()
|
|
codec := cjson.NewCodec()
|
|
rpcServer.RegisterCodec(codec, "application/json")
|
|
rpcServer.RegisterCodec(codec, "application/json;charset=UTF-8")
|
|
rpcServer.RegisterService(&Service{vm: vm}, "avm") // name this service "avm"
|
|
|
|
return map[string]*common.HTTPHandler{
|
|
"": &common.HTTPHandler{Handler: rpcServer},
|
|
"/pubsub": &common.HTTPHandler{LockOptions: common.NoLock, Handler: vm.pubsub},
|
|
}
|
|
}
|
|
|
|
// CreateStaticHandlers implements the avalanche.DAGVM interface
|
|
func (vm *VM) CreateStaticHandlers() map[string]*common.HTTPHandler {
|
|
newServer := rpc.NewServer()
|
|
codec := cjson.NewCodec()
|
|
newServer.RegisterCodec(codec, "application/json")
|
|
newServer.RegisterCodec(codec, "application/json;charset=UTF-8")
|
|
newServer.RegisterService(&StaticService{}, "avm") // name this service "avm"
|
|
return map[string]*common.HTTPHandler{
|
|
"": &common.HTTPHandler{LockOptions: common.WriteLock, Handler: newServer},
|
|
}
|
|
}
|
|
|
|
// PendingTxs implements the avalanche.DAGVM interface
|
|
func (vm *VM) PendingTxs() []snowstorm.Tx {
|
|
vm.timer.Cancel()
|
|
|
|
txs := vm.txs
|
|
vm.txs = nil
|
|
return txs
|
|
}
|
|
|
|
// ParseTx implements the avalanche.DAGVM interface
|
|
func (vm *VM) ParseTx(b []byte) (snowstorm.Tx, error) { return vm.parseTx(b) }
|
|
|
|
// GetTx implements the avalanche.DAGVM interface
|
|
func (vm *VM) GetTx(txID ids.ID) (snowstorm.Tx, error) {
|
|
tx := &UniqueTx{
|
|
vm: vm,
|
|
txID: txID,
|
|
}
|
|
// Verify must be called in the case the that tx was flushed from the unique
|
|
// cache.
|
|
return tx, tx.Verify()
|
|
}
|
|
|
|
/*
|
|
******************************************************************************
|
|
********************************** JSON API **********************************
|
|
******************************************************************************
|
|
*/
|
|
|
|
// IssueTx attempts to send a transaction to consensus.
|
|
// If onDecide is specified, the function will be called when the transaction is
|
|
// either accepted or rejected with the appropriate status. This function will
|
|
// go out of scope when the transaction is removed from memory.
|
|
func (vm *VM) IssueTx(b []byte, onDecide func(choices.Status)) (ids.ID, error) {
|
|
tx, err := vm.parseTx(b)
|
|
if err != nil {
|
|
return ids.ID{}, err
|
|
}
|
|
if err := tx.Verify(); err != nil {
|
|
return ids.ID{}, err
|
|
}
|
|
vm.issueTx(tx)
|
|
tx.onDecide = onDecide
|
|
return tx.ID(), nil
|
|
}
|
|
|
|
// GetUTXOs returns the utxos that at least one of the provided addresses is
|
|
// referenced in.
|
|
func (vm *VM) GetUTXOs(addrs ids.Set) ([]*ava.UTXO, error) {
|
|
utxoIDs := ids.Set{}
|
|
for _, addr := range addrs.List() {
|
|
utxos, _ := vm.state.Funds(addr)
|
|
utxoIDs.Add(utxos...)
|
|
}
|
|
|
|
utxos := []*ava.UTXO{}
|
|
for _, utxoID := range utxoIDs.List() {
|
|
utxo, err := vm.state.UTXO(utxoID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
utxos = append(utxos, utxo)
|
|
}
|
|
return utxos, nil
|
|
}
|
|
|
|
/*
|
|
******************************************************************************
|
|
*********************************** Fx API ***********************************
|
|
******************************************************************************
|
|
*/
|
|
|
|
// Clock returns a reference to the internal clock of this VM
|
|
func (vm *VM) Clock() *timer.Clock { return &vm.clock }
|
|
|
|
// Codec returns a reference to the internal codec of this VM
|
|
func (vm *VM) Codec() codec.Codec { return vm.codec }
|
|
|
|
/*
|
|
******************************************************************************
|
|
********************************** Timer API *********************************
|
|
******************************************************************************
|
|
*/
|
|
|
|
// FlushTxs into consensus
|
|
func (vm *VM) FlushTxs() {
|
|
vm.timer.Cancel()
|
|
if len(vm.txs) != 0 {
|
|
select {
|
|
case vm.toEngine <- common.PendingTxs:
|
|
default:
|
|
vm.ctx.Log.Warn("Delaying issuance of transactions due to contention")
|
|
vm.timer.SetTimeoutIn(vm.batchTimeout)
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
******************************************************************************
|
|
********************************** Helpers ***********************************
|
|
******************************************************************************
|
|
*/
|
|
|
|
func (vm *VM) initAliases(genesisBytes []byte) error {
|
|
genesis := Genesis{}
|
|
if err := vm.codec.Unmarshal(genesisBytes, &genesis); err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, genesisTx := range genesis.Txs {
|
|
if len(genesisTx.Outs) != 0 {
|
|
return errGenesisAssetMustHaveState
|
|
}
|
|
|
|
tx := Tx{
|
|
UnsignedTx: &genesisTx.CreateAssetTx,
|
|
}
|
|
txBytes, err := vm.codec.Marshal(&tx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
tx.Initialize(txBytes)
|
|
|
|
txID := tx.ID()
|
|
|
|
vm.Alias(txID, genesisTx.Alias)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (vm *VM) initState(genesisBytes []byte) error {
|
|
genesis := Genesis{}
|
|
if err := vm.codec.Unmarshal(genesisBytes, &genesis); err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, genesisTx := range genesis.Txs {
|
|
if len(genesisTx.Outs) != 0 {
|
|
return errGenesisAssetMustHaveState
|
|
}
|
|
|
|
tx := Tx{
|
|
UnsignedTx: &genesisTx.CreateAssetTx,
|
|
}
|
|
txBytes, err := vm.codec.Marshal(&tx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
tx.Initialize(txBytes)
|
|
|
|
txID := tx.ID()
|
|
|
|
vm.ctx.Log.Info("Initializing with AssetID %s", txID)
|
|
|
|
if err := vm.state.SetTx(txID, &tx); err != nil {
|
|
return err
|
|
}
|
|
if err := vm.state.SetStatus(txID, choices.Accepted); err != nil {
|
|
return err
|
|
}
|
|
for _, utxo := range tx.UTXOs() {
|
|
if err := vm.state.FundUTXO(utxo); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return vm.state.SetDBInitialized(choices.Processing)
|
|
}
|
|
|
|
func (vm *VM) parseTx(b []byte) (*UniqueTx, error) {
|
|
rawTx := &Tx{}
|
|
err := vm.codec.Unmarshal(b, rawTx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rawTx.Initialize(b)
|
|
|
|
tx := &UniqueTx{
|
|
TxState: &TxState{
|
|
Tx: rawTx,
|
|
},
|
|
vm: vm,
|
|
txID: rawTx.ID(),
|
|
}
|
|
if err := tx.SyntacticVerify(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if tx.Status() == choices.Unknown {
|
|
if err := vm.state.SetTx(tx.ID(), tx.Tx); err != nil {
|
|
return nil, err
|
|
}
|
|
tx.setStatus(choices.Processing)
|
|
}
|
|
|
|
return tx, nil
|
|
}
|
|
|
|
func (vm *VM) issueTx(tx snowstorm.Tx) {
|
|
vm.txs = append(vm.txs, tx)
|
|
switch {
|
|
case len(vm.txs) == batchSize:
|
|
vm.FlushTxs()
|
|
case len(vm.txs) == 1:
|
|
vm.timer.SetTimeoutIn(vm.batchTimeout)
|
|
}
|
|
}
|
|
|
|
func (vm *VM) getUTXO(utxoID *ava.UTXOID) (*ava.UTXO, error) {
|
|
inputID := utxoID.InputID()
|
|
utxo, err := vm.state.UTXO(inputID)
|
|
if err == nil {
|
|
return utxo, nil
|
|
}
|
|
|
|
inputTx, inputIndex := utxoID.InputSource()
|
|
parent := UniqueTx{
|
|
vm: vm,
|
|
txID: inputTx,
|
|
}
|
|
|
|
if err := parent.Verify(); err != nil {
|
|
return nil, errMissingUTXO
|
|
} else if status := parent.Status(); status.Decided() {
|
|
return nil, errMissingUTXO
|
|
}
|
|
|
|
parentUTXOs := parent.UTXOs()
|
|
if uint32(len(parentUTXOs)) <= inputIndex || int(inputIndex) < 0 {
|
|
return nil, errInvalidUTXO
|
|
}
|
|
return parentUTXOs[int(inputIndex)], nil
|
|
}
|
|
|
|
func (vm *VM) getFx(val interface{}) (int, error) {
|
|
valType := reflect.TypeOf(val)
|
|
fx, exists := vm.typeToFxIndex[valType]
|
|
if !exists {
|
|
return 0, errUnknownFx
|
|
}
|
|
return fx, nil
|
|
}
|
|
|
|
func (vm *VM) verifyFxUsage(fxID int, assetID ids.ID) bool {
|
|
tx := &UniqueTx{
|
|
vm: vm,
|
|
txID: assetID,
|
|
}
|
|
if status := tx.Status(); !status.Fetched() {
|
|
return false
|
|
}
|
|
createAssetTx, ok := tx.UnsignedTx.(*CreateAssetTx)
|
|
if !ok {
|
|
return false
|
|
}
|
|
// TODO: This could be a binary search to import performance... Or perhaps
|
|
// make a map
|
|
for _, state := range createAssetTx.States {
|
|
if state.FxID == uint32(fxID) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// Parse ...
|
|
func (vm *VM) Parse(addrStr string) ([]byte, error) {
|
|
if count := strings.Count(addrStr, addressSep); count != 1 {
|
|
return nil, errInvalidAddress
|
|
}
|
|
addressParts := strings.SplitN(addrStr, addressSep, 2)
|
|
bcAlias := addressParts[0]
|
|
rawAddr := addressParts[1]
|
|
bcID, err := vm.ctx.BCLookup.Lookup(bcAlias)
|
|
if err != nil {
|
|
bcID, err = ids.FromString(bcAlias)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
if !bcID.Equals(vm.ctx.ChainID) {
|
|
return nil, errWrongBlockchainID
|
|
}
|
|
cb58 := formatting.CB58{}
|
|
err = cb58.FromString(rawAddr)
|
|
return cb58.Bytes, err
|
|
}
|
|
|
|
// Format ...
|
|
func (vm *VM) Format(b []byte) string {
|
|
var bcAlias string
|
|
if alias, err := vm.ctx.BCLookup.PrimaryAlias(vm.ctx.ChainID); err == nil {
|
|
bcAlias = alias
|
|
} else {
|
|
bcAlias = vm.ctx.ChainID.String()
|
|
}
|
|
return fmt.Sprintf("%s%s%s", bcAlias, addressSep, formatting.CB58{Bytes: b})
|
|
}
|