// (c) 2019-2020, Ava Labs, Inc. All rights reserved. // See the file LICENSE for licensing terms. package spchainvm import ( "errors" "time" "github.com/gorilla/rpc/v2" "github.com/ava-labs/gecko/cache" "github.com/ava-labs/gecko/database" "github.com/ava-labs/gecko/database/versiondb" "github.com/ava-labs/gecko/ids" "github.com/ava-labs/gecko/snow" "github.com/ava-labs/gecko/snow/choices" "github.com/ava-labs/gecko/snow/consensus/snowman" "github.com/ava-labs/gecko/snow/engine/common" "github.com/ava-labs/gecko/utils/crypto" "github.com/ava-labs/gecko/utils/formatting" "github.com/ava-labs/gecko/utils/timer" "github.com/ava-labs/gecko/utils/wrappers" jsoncodec "github.com/ava-labs/gecko/utils/json" ) const ( batchTimeout = time.Second idCacheSize = 10000 sigCache = 10000 ) var ( maxBatchSize = 30 ) var ( errNoTxs = errors.New("no transactions") errUnknownBlock = errors.New("unknown block") errUnsupportedFXs = errors.New("unsupported feature extensions") ) // VM implements the snowman.ChainVM interface type VM struct { ctx *snow.Context // State management state *prefixedState baseDB database.Database factory crypto.FactorySECP256K1R // The ID of the last accepted block lastAccepted ids.ID // Transaction issuing preferred ids.ID toEngine chan<- common.Message timer *timer.Timer txs []*Tx currentBlocks map[[32]byte]*LiveBlock onAccept func(ids.ID) } /* ****************************************************************************** ********************************* Snowman API ******************************** ****************************************************************************** */ // Initialize implements the snowman.ChainVM interface func (vm *VM) Initialize( ctx *snow.Context, db database.Database, genesisBytes []byte, toEngine chan<- common.Message, fxs []*common.Fx, ) error { if len(fxs) != 0 { return errUnsupportedFXs } vm.ctx = ctx vm.state = &prefixedState{ block: &cache.LRU{Size: idCacheSize}, account: &cache.LRU{Size: idCacheSize}, status: &cache.LRU{Size: idCacheSize}, } vm.baseDB = db vm.factory.Cache.Size = sigCache if dbStatus, err := vm.state.DBInitialized(db); err != nil || dbStatus == choices.Unknown { if err := vm.initState(genesisBytes); err != nil { return err } } vm.timer = timer.NewTimer(func() { ctx.Lock.Lock() defer ctx.Lock.Unlock() vm.FlushTxs() }) go ctx.Log.RecoverAndPanic(vm.timer.Dispatch) vm.toEngine = toEngine lastAccepted, err := vm.state.LastAccepted(db) if err != nil { return err } vm.lastAccepted = lastAccepted vm.currentBlocks = make(map[[32]byte]*LiveBlock) return nil } // Bootstrapping marks this VM as bootstrapping func (vm *VM) Bootstrapping() error { return nil } // Bootstrapped marks this VM as bootstrapped func (vm *VM) Bootstrapped() error { return nil } // Shutdown implements the snowman.ChainVM interface func (vm *VM) Shutdown() error { if vm.timer == nil { return nil } // There is a potential deadlock if the timer is about to execute a timeout. // So, the lock must be released before stopping the timer. vm.ctx.Lock.Unlock() vm.timer.Stop() vm.ctx.Lock.Lock() return vm.baseDB.Close() } // BuildBlock implements the snowman.ChainVM interface func (vm *VM) BuildBlock() (snowman.Block, error) { vm.timer.Cancel() if len(vm.txs) == 0 { return nil, errNoTxs } defer vm.FlushTxs() txs := vm.txs if len(txs) > maxBatchSize { txs = txs[:maxBatchSize] } vm.txs = vm.txs[len(txs):] builder := Builder{ NetworkID: 0, ChainID: vm.ctx.ChainID, } rawBlock, err := builder.NewBlock(vm.preferred, txs) if err != nil { vm.ctx.Log.Warn("Dropping transactions due to %s", err) return nil, err } rawBlock.startVerify(vm.ctx, &vm.factory) block := &LiveBlock{ vm: vm, block: rawBlock, } return block, vm.state.SetBlock(vm.baseDB, rawBlock.ID(), rawBlock) } // ParseBlock implements the snowman.ChainVM interface func (vm *VM) ParseBlock(b []byte) (snowman.Block, error) { c := Codec{} rawBlock, err := c.UnmarshalBlock(b) if err != nil { return nil, err } rawBlock.startVerify(vm.ctx, &vm.factory) block := &LiveBlock{ vm: vm, block: rawBlock, } return block, vm.state.SetBlock(vm.baseDB, rawBlock.ID(), rawBlock) } // GetBlock implements the snowman.ChainVM interface func (vm *VM) GetBlock(id ids.ID) (snowman.Block, error) { blk, err := vm.state.Block(vm.baseDB, id) if err != nil { return nil, err } blk.startVerify(vm.ctx, &vm.factory) return &LiveBlock{ vm: vm, block: blk, }, nil } // SetPreference sets what the current tail of the chain is func (vm *VM) SetPreference(preferred ids.ID) { vm.preferred = preferred } // LastAccepted returns the last accepted block ID func (vm *VM) LastAccepted() ids.ID { return vm.lastAccepted } // CreateHandlers makes new service objects with references to the vm func (vm *VM) CreateHandlers() map[string]*common.HTTPHandler { newServer := rpc.NewServer() codec := jsoncodec.NewCodec() newServer.RegisterCodec(codec, "application/json") newServer.RegisterCodec(codec, "application/json;charset=UTF-8") newServer.RegisterService(&Service{vm: vm}, "spchain") // Name the API service "spchain" return map[string]*common.HTTPHandler{ "": &common.HTTPHandler{LockOptions: common.WriteLock, Handler: newServer}, } } // CreateStaticHandlers makes new service objects without references to the vm func (vm *VM) CreateStaticHandlers() map[string]*common.HTTPHandler { newServer := rpc.NewServer() codec := jsoncodec.NewCodec() newServer.RegisterCodec(codec, "application/json") newServer.RegisterCodec(codec, "application/json;charset=UTF-8") newServer.RegisterService(&StaticService{}, "spchain") // Name the API service "spchain" return map[string]*common.HTTPHandler{ "": &common.HTTPHandler{LockOptions: common.NoLock, Handler: newServer}, } } // IssueTx ... // TODO: Remove this func (vm *VM) IssueTx(b []byte, onDecide func(choices.Status)) (ids.ID, error) { codec := Codec{} tx, err := codec.UnmarshalTx(b) if err != nil { return ids.ID{}, err } tx.startVerify(vm.ctx, &vm.factory) tx.onDecide = onDecide vm.issueTx(tx) return tx.id, nil } // GetAccount returns the account with the specified address func (vm *VM) GetAccount(db database.Database, address ids.ShortID) Account { if account, err := vm.state.Account(db, address.LongID()); err == nil { return account } builder := Builder{ NetworkID: 0, ChainID: vm.ctx.ChainID, } return builder.NewAccount(address, 0, 0) } /* ****************************************************************************** ********************************** Timer API ********************************* ****************************************************************************** */ // FlushTxs into consensus func (vm *VM) FlushTxs() { vm.timer.Cancel() if len(vm.txs) > 0 { select { case vm.toEngine <- common.PendingTxs: default: vm.ctx.Log.Warn("Dropping block due to too frequent issuance") vm.timer.SetTimeoutIn(batchTimeout) } } } /* ****************************************************************************** ******************************* Implementation ******************************* ****************************************************************************** */ // Consensus: func (vm *VM) initState(genesisBytes []byte) error { errs := wrappers.Errs{} vdb := versiondb.New(vm.baseDB) b := Builder{} block, err := b.NewBlock(ids.Empty, nil) errs.Add(err) c := Codec{} accounts, err := c.UnmarshalGenesis(genesisBytes) errs.Add(err) errs.Add(vm.state.SetBlock(vdb, block.ID(), block)) errs.Add(vm.state.SetStatus(vdb, block.ID(), choices.Accepted)) errs.Add(vm.state.SetLastAccepted(vdb, block.ID())) for _, account := range accounts { errs.Add(vm.state.SetAccount(vdb, account.ID().LongID(), account)) } errs.Add(vm.state.SetDBInitialized(vdb, choices.Processing)) if errs.Errored() { return errs.Err } return vdb.Commit() } func (vm *VM) issueTx(tx *Tx) { vm.ctx.Log.Verbo("Issuing tx:\n%s", formatting.DumpBytes{Bytes: tx.Bytes()}) vm.txs = append(vm.txs, tx) switch { case len(vm.txs) == maxBatchSize: vm.FlushTxs() case len(vm.txs) == 1: vm.timer.SetTimeoutIn(batchTimeout) } }