mirror of https://github.com/poanetwork/gecko.git
525 lines
15 KiB
Go
525 lines
15 KiB
Go
|
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
|
||
|
// See the file LICENSE for licensing terms.
|
||
|
|
||
|
package chains
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"time"
|
||
|
|
||
|
"github.com/ava-labs/gecko/api"
|
||
|
"github.com/ava-labs/gecko/api/keystore"
|
||
|
"github.com/ava-labs/gecko/database"
|
||
|
"github.com/ava-labs/gecko/database/prefixdb"
|
||
|
"github.com/ava-labs/gecko/ids"
|
||
|
"github.com/ava-labs/gecko/snow"
|
||
|
"github.com/ava-labs/gecko/snow/consensus/snowball"
|
||
|
"github.com/ava-labs/gecko/snow/engine/avalanche"
|
||
|
"github.com/ava-labs/gecko/snow/engine/avalanche/state"
|
||
|
"github.com/ava-labs/gecko/snow/engine/common"
|
||
|
"github.com/ava-labs/gecko/snow/engine/common/queue"
|
||
|
"github.com/ava-labs/gecko/snow/networking"
|
||
|
"github.com/ava-labs/gecko/snow/networking/handler"
|
||
|
"github.com/ava-labs/gecko/snow/networking/router"
|
||
|
"github.com/ava-labs/gecko/snow/networking/sender"
|
||
|
"github.com/ava-labs/gecko/snow/networking/timeout"
|
||
|
"github.com/ava-labs/gecko/snow/triggers"
|
||
|
"github.com/ava-labs/gecko/snow/validators"
|
||
|
"github.com/ava-labs/gecko/utils/logging"
|
||
|
"github.com/ava-labs/gecko/vms"
|
||
|
|
||
|
avacon "github.com/ava-labs/gecko/snow/consensus/avalanche"
|
||
|
avaeng "github.com/ava-labs/gecko/snow/engine/avalanche"
|
||
|
|
||
|
smcon "github.com/ava-labs/gecko/snow/consensus/snowman"
|
||
|
smeng "github.com/ava-labs/gecko/snow/engine/snowman"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
defaultChannelSize = 1000
|
||
|
requestTimeout = 2 * time.Second
|
||
|
)
|
||
|
|
||
|
// Manager manages the chains running on this node.
|
||
|
// It can:
|
||
|
// * Create a chain
|
||
|
// * Add a registrant. When a chain is created, each registrant calls
|
||
|
// RegisterChain with the new chain as the argument.
|
||
|
// * Get the aliases associated with a given chain.
|
||
|
// * Get the ID of the chain associated with a given alias.
|
||
|
type Manager interface {
|
||
|
// Return the router this Manager is using to route consensus messages to chains
|
||
|
Router() router.Router
|
||
|
|
||
|
// Create a chain in the future
|
||
|
CreateChain(ChainParameters)
|
||
|
|
||
|
// Create a chain now
|
||
|
ForceCreateChain(ChainParameters)
|
||
|
|
||
|
// Add a registrant [r]. Every time a chain is
|
||
|
// created, [r].RegisterChain([new chain]) is called
|
||
|
AddRegistrant(Registrant)
|
||
|
|
||
|
// Given an alias, return the ID of the chain associated with that alias
|
||
|
Lookup(string) (ids.ID, error)
|
||
|
|
||
|
// Given an alias, return the ID of the VM associated with that alias
|
||
|
LookupVM(string) (ids.ID, error)
|
||
|
|
||
|
// Return the aliases associated with a chain
|
||
|
Aliases(ids.ID) []string
|
||
|
|
||
|
// Add an alias to a chain
|
||
|
Alias(ids.ID, string) error
|
||
|
|
||
|
Shutdown()
|
||
|
}
|
||
|
|
||
|
// ChainParameters defines the chain being created
|
||
|
type ChainParameters struct {
|
||
|
ID ids.ID // The ID of the chain being created
|
||
|
SubnetID ids.ID // ID of the subnet that validates this chain
|
||
|
GenesisData []byte // The genesis data of this chain's ledger
|
||
|
VMAlias string // The ID of the vm this chain is running
|
||
|
FxAliases []string // The IDs of the feature extensions this chain is running
|
||
|
|
||
|
CustomBeacons validators.Set // Should only be set if the default beacons can't be used.
|
||
|
}
|
||
|
|
||
|
type manager struct {
|
||
|
// Note: The string representation of a chain's ID is also considered to be an alias of the chain
|
||
|
// That is, [chainID].String() is an alias for the chain, too
|
||
|
ids.Aliaser
|
||
|
|
||
|
log logging.Logger
|
||
|
logFactory logging.Factory
|
||
|
vmManager vms.Manager // Manage mappings from vm ID --> vm
|
||
|
decisionEvents *triggers.EventDispatcher
|
||
|
consensusEvents *triggers.EventDispatcher
|
||
|
db database.Database
|
||
|
chainRouter router.Router // Routes incoming messages to the appropriate chain
|
||
|
sender sender.ExternalSender // Sends consensus messages to other validators
|
||
|
timeoutManager *timeout.Manager // Manages request timeouts when sending messages to other validators
|
||
|
consensusParams avacon.Parameters // The consensus parameters (alpha, beta, etc.) for new chains
|
||
|
validators validators.Manager // Validators validating on this chain
|
||
|
registrants []Registrant // Those notified when a chain is created
|
||
|
nodeID ids.ShortID // The ID of this node
|
||
|
networkID uint32 // ID of the network this node is connected to
|
||
|
awaiter Awaiter // Waits for required connections before running bootstrapping
|
||
|
server *api.Server // Handles HTTP API calls
|
||
|
keystore *keystore.Keystore
|
||
|
|
||
|
unblocked bool
|
||
|
blockedChains []ChainParameters
|
||
|
}
|
||
|
|
||
|
// New returns a new Manager where:
|
||
|
// <db> is this node's database
|
||
|
// <sender> sends messages to other validators
|
||
|
// <validators> validate this chain
|
||
|
// TODO: Make this function take less arguments
|
||
|
func New(
|
||
|
log logging.Logger,
|
||
|
logFactory logging.Factory,
|
||
|
vmManager vms.Manager,
|
||
|
decisionEvents *triggers.EventDispatcher,
|
||
|
consensusEvents *triggers.EventDispatcher,
|
||
|
db database.Database,
|
||
|
router router.Router,
|
||
|
sender sender.ExternalSender,
|
||
|
consensusParams avacon.Parameters,
|
||
|
validators validators.Manager,
|
||
|
nodeID ids.ShortID,
|
||
|
networkID uint32,
|
||
|
awaiter Awaiter,
|
||
|
server *api.Server,
|
||
|
keystore *keystore.Keystore,
|
||
|
) Manager {
|
||
|
timeoutManager := timeout.Manager{}
|
||
|
timeoutManager.Initialize(requestTimeout)
|
||
|
go log.RecoverAndPanic(timeoutManager.Dispatch)
|
||
|
|
||
|
router.Initialize(log, &timeoutManager)
|
||
|
|
||
|
m := &manager{
|
||
|
log: log,
|
||
|
logFactory: logFactory,
|
||
|
vmManager: vmManager,
|
||
|
decisionEvents: decisionEvents,
|
||
|
consensusEvents: consensusEvents,
|
||
|
db: db,
|
||
|
chainRouter: router,
|
||
|
sender: sender,
|
||
|
timeoutManager: &timeoutManager,
|
||
|
consensusParams: consensusParams,
|
||
|
validators: validators,
|
||
|
nodeID: nodeID,
|
||
|
networkID: networkID,
|
||
|
awaiter: awaiter,
|
||
|
server: server,
|
||
|
keystore: keystore,
|
||
|
}
|
||
|
m.Initialize()
|
||
|
return m
|
||
|
}
|
||
|
|
||
|
// Router that this chain manager is using to route consensus messages to chains
|
||
|
func (m *manager) Router() router.Router { return m.chainRouter }
|
||
|
|
||
|
// Create a chain
|
||
|
func (m *manager) CreateChain(chain ChainParameters) {
|
||
|
if !m.unblocked {
|
||
|
m.blockedChains = append(m.blockedChains, chain)
|
||
|
} else {
|
||
|
m.ForceCreateChain(chain)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Create a chain
|
||
|
func (m *manager) ForceCreateChain(chain ChainParameters) {
|
||
|
m.log.Info("creating chain:\n"+
|
||
|
" ID: %s\n"+
|
||
|
" VMID:%s",
|
||
|
chain.ID,
|
||
|
chain.VMAlias,
|
||
|
)
|
||
|
|
||
|
// Assert that there isn't already a chain with an alias in [chain].Aliases
|
||
|
// (Recall that the string repr. of a chain's ID is also an alias for a chain)
|
||
|
if alias, isRepeat := m.isChainWithAlias(chain.ID.String()); isRepeat {
|
||
|
m.log.Error("there is already a chain with alias '%s'. Chain not created.", alias)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
vmID, err := m.vmManager.Lookup(chain.VMAlias)
|
||
|
if err != nil {
|
||
|
m.log.Error("error while looking up VM: %s", err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Get a factory for the vm we want to use on our chain
|
||
|
vmFactory, err := m.vmManager.GetVMFactory(vmID)
|
||
|
if err != nil {
|
||
|
m.log.Error("error while getting vmFactory: %s", err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Create the chain
|
||
|
vm := vmFactory.New()
|
||
|
|
||
|
fxs := make([]*common.Fx, len(chain.FxAliases))
|
||
|
for i, fxAlias := range chain.FxAliases {
|
||
|
fxID, err := m.vmManager.Lookup(fxAlias)
|
||
|
if err != nil {
|
||
|
m.log.Error("error while looking up Fx: %s", err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Get a factory for the fx we want to use on our chain
|
||
|
fxFactory, err := m.vmManager.GetVMFactory(fxID)
|
||
|
if err != nil {
|
||
|
m.log.Error("error while getting fxFactory: %s", err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Create the fx
|
||
|
fxs[i] = &common.Fx{
|
||
|
ID: fxID,
|
||
|
Fx: fxFactory.New(),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Create the log and context of the chain
|
||
|
chainLog, err := m.logFactory.MakeChain(chain.ID, "")
|
||
|
if err != nil {
|
||
|
m.log.Error("error while creating chain's log %s", err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
ctx := &snow.Context{
|
||
|
NetworkID: m.networkID,
|
||
|
ChainID: chain.ID,
|
||
|
Log: chainLog,
|
||
|
DecisionDispatcher: m.decisionEvents,
|
||
|
ConsensusDispatcher: m.consensusEvents,
|
||
|
NodeID: m.nodeID,
|
||
|
HTTP: m.server,
|
||
|
Keystore: m.keystore.NewBlockchainKeyStore(chain.ID),
|
||
|
BCLookup: m,
|
||
|
}
|
||
|
consensusParams := m.consensusParams
|
||
|
if alias, err := m.PrimaryAlias(ctx.ChainID); err == nil {
|
||
|
consensusParams.Namespace = fmt.Sprintf("gecko_%s", alias)
|
||
|
} else {
|
||
|
consensusParams.Namespace = fmt.Sprintf("gecko_%s", ctx.ChainID)
|
||
|
}
|
||
|
|
||
|
// The validators of this blockchain
|
||
|
validators, ok := m.validators.GetValidatorSet(ids.Empty) // TODO: Change argument to chain.SubnetID
|
||
|
if !ok {
|
||
|
m.log.Error("couldn't get validator set of subnet with ID %s. The subnet may not exist", chain.SubnetID)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
beacons := validators
|
||
|
if chain.CustomBeacons != nil {
|
||
|
beacons = chain.CustomBeacons
|
||
|
}
|
||
|
|
||
|
switch vm := vm.(type) {
|
||
|
case avalanche.DAGVM:
|
||
|
err := m.createAvalancheChain(
|
||
|
ctx,
|
||
|
chain.GenesisData,
|
||
|
validators,
|
||
|
beacons,
|
||
|
vm,
|
||
|
fxs,
|
||
|
consensusParams,
|
||
|
)
|
||
|
if err != nil {
|
||
|
m.log.Error("error while creating new avalanche vm %s", err)
|
||
|
return
|
||
|
}
|
||
|
case smeng.ChainVM:
|
||
|
err := m.createSnowmanChain(
|
||
|
ctx,
|
||
|
chain.GenesisData,
|
||
|
validators,
|
||
|
beacons,
|
||
|
vm,
|
||
|
fxs,
|
||
|
consensusParams.Parameters,
|
||
|
)
|
||
|
if err != nil {
|
||
|
m.log.Error("error while creating new snowman vm %s", err)
|
||
|
return
|
||
|
}
|
||
|
default:
|
||
|
m.log.Error("the vm should have type avalanche.DAGVM or snowman.ChainVM. Chain not created")
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Associate the newly created chain with its default alias
|
||
|
m.log.AssertNoError(m.Alias(chain.ID, chain.ID.String()))
|
||
|
|
||
|
// Notify those that registered to be notified when a new chain is created
|
||
|
m.notifyRegistrants(ctx, vm)
|
||
|
}
|
||
|
|
||
|
// Implements Manager.AddRegistrant
|
||
|
func (m *manager) AddRegistrant(r Registrant) { m.registrants = append(m.registrants, r) }
|
||
|
|
||
|
func (m *manager) unblockChains() {
|
||
|
m.unblocked = true
|
||
|
blocked := m.blockedChains
|
||
|
m.blockedChains = nil
|
||
|
for _, chain := range blocked {
|
||
|
m.ForceCreateChain(chain)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Create a DAG-based blockchain that uses Avalanche
|
||
|
func (m *manager) createAvalancheChain(
|
||
|
ctx *snow.Context,
|
||
|
genesisData []byte,
|
||
|
validators,
|
||
|
beacons validators.Set,
|
||
|
vm avalanche.DAGVM,
|
||
|
fxs []*common.Fx,
|
||
|
consensusParams avacon.Parameters,
|
||
|
) error {
|
||
|
ctx.Lock.Lock()
|
||
|
defer ctx.Lock.Unlock()
|
||
|
|
||
|
db := prefixdb.New(ctx.ChainID.Bytes(), m.db)
|
||
|
vmDB := prefixdb.New([]byte("vm"), db)
|
||
|
vertexDB := prefixdb.New([]byte("vertex"), db)
|
||
|
vertexBootstrappingDB := prefixdb.New([]byte("vertex_bootstrapping"), db)
|
||
|
txBootstrappingDB := prefixdb.New([]byte("tx_bootstrapping"), db)
|
||
|
|
||
|
vtxBlocker, err := queue.New(vertexBootstrappingDB)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
txBlocker, err := queue.New(txBootstrappingDB)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// The channel through which a VM may send messages to the consensus engine
|
||
|
// VM uses this channel to notify engine that a block is ready to be made
|
||
|
msgChan := make(chan common.Message, defaultChannelSize)
|
||
|
|
||
|
if err := vm.Initialize(ctx, vmDB, genesisData, msgChan, fxs); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// Handles serialization/deserialization of vertices and also the
|
||
|
// persistence of vertices
|
||
|
vtxState := &state.Serializer{}
|
||
|
vtxState.Initialize(ctx, vm, vertexDB)
|
||
|
|
||
|
// Passes messages from the consensus engine to the network
|
||
|
sender := sender.Sender{}
|
||
|
sender.Initialize(ctx, m.sender, m.chainRouter, m.timeoutManager)
|
||
|
|
||
|
// The engine handles consensus
|
||
|
engine := avaeng.Transitive{
|
||
|
Config: avaeng.Config{
|
||
|
BootstrapConfig: avaeng.BootstrapConfig{
|
||
|
Config: common.Config{
|
||
|
Context: ctx,
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
}
|
||
|
|
||
|
engine.Initialize(avaeng.Config{
|
||
|
BootstrapConfig: avaeng.BootstrapConfig{
|
||
|
Config: common.Config{
|
||
|
Context: ctx,
|
||
|
Validators: validators,
|
||
|
Beacons: beacons,
|
||
|
Alpha: (beacons.Len() + 1) / 2,
|
||
|
Sender: &sender,
|
||
|
},
|
||
|
VtxBlocked: vtxBlocker,
|
||
|
TxBlocked: txBlocker,
|
||
|
State: vtxState,
|
||
|
VM: vm,
|
||
|
},
|
||
|
Params: consensusParams,
|
||
|
Consensus: &avacon.Topological{},
|
||
|
})
|
||
|
|
||
|
// Asynchronously passes messages from the network to the consensus engine
|
||
|
handler := &handler.Handler{}
|
||
|
handler.Initialize(&engine, msgChan, defaultChannelSize)
|
||
|
|
||
|
// Allows messages to be routed to the new chain
|
||
|
m.chainRouter.AddChain(handler)
|
||
|
go ctx.Log.RecoverAndPanic(handler.Dispatch)
|
||
|
|
||
|
awaiting := &networking.AwaitingConnections{
|
||
|
Finish: func() {
|
||
|
ctx.Lock.Lock()
|
||
|
defer ctx.Lock.Unlock()
|
||
|
|
||
|
engine.Startup()
|
||
|
},
|
||
|
}
|
||
|
for _, vdr := range beacons.List() {
|
||
|
awaiting.Requested.Add(vdr.ID())
|
||
|
}
|
||
|
awaiting.NumRequired = (3*awaiting.Requested.Len() + 3) / 4 // 75% must be connected to
|
||
|
m.awaiter.AwaitConnections(awaiting)
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Create a linear chain using the Snowman consensus engine
|
||
|
func (m *manager) createSnowmanChain(
|
||
|
ctx *snow.Context,
|
||
|
genesisData []byte,
|
||
|
validators,
|
||
|
beacons validators.Set,
|
||
|
vm smeng.ChainVM,
|
||
|
fxs []*common.Fx,
|
||
|
consensusParams snowball.Parameters,
|
||
|
) error {
|
||
|
ctx.Lock.Lock()
|
||
|
defer ctx.Lock.Unlock()
|
||
|
|
||
|
db := prefixdb.New(ctx.ChainID.Bytes(), m.db)
|
||
|
vmDB := prefixdb.New([]byte("vm"), db)
|
||
|
bootstrappingDB := prefixdb.New([]byte("bootstrapping"), db)
|
||
|
|
||
|
blocked, err := queue.New(bootstrappingDB)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// The channel through which a VM may send messages to the consensus engine
|
||
|
// VM uses this channel to notify engine that a block is ready to be made
|
||
|
msgChan := make(chan common.Message, defaultChannelSize)
|
||
|
|
||
|
// Initialize the VM
|
||
|
if err := vm.Initialize(ctx, vmDB, genesisData, msgChan, fxs); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// Passes messages from the consensus engine to the network
|
||
|
sender := sender.Sender{}
|
||
|
sender.Initialize(ctx, m.sender, m.chainRouter, m.timeoutManager)
|
||
|
|
||
|
// The engine handles consensus
|
||
|
engine := smeng.Transitive{}
|
||
|
engine.Initialize(smeng.Config{
|
||
|
BootstrapConfig: smeng.BootstrapConfig{
|
||
|
Config: common.Config{
|
||
|
Context: ctx,
|
||
|
Validators: validators,
|
||
|
Beacons: beacons,
|
||
|
Alpha: (beacons.Len() + 1) / 2,
|
||
|
Sender: &sender,
|
||
|
},
|
||
|
Blocked: blocked,
|
||
|
VM: vm,
|
||
|
Bootstrapped: m.unblockChains,
|
||
|
},
|
||
|
Params: consensusParams,
|
||
|
Consensus: &smcon.Topological{},
|
||
|
})
|
||
|
|
||
|
// Asynchronously passes messages from the network to the consensus engine
|
||
|
handler := &handler.Handler{}
|
||
|
handler.Initialize(&engine, msgChan, defaultChannelSize)
|
||
|
|
||
|
// Allow incoming messages to be routed to the new chain
|
||
|
m.chainRouter.AddChain(handler)
|
||
|
go ctx.Log.RecoverAndPanic(handler.Dispatch)
|
||
|
|
||
|
awaiting := &networking.AwaitingConnections{
|
||
|
Finish: func() {
|
||
|
ctx.Lock.Lock()
|
||
|
defer ctx.Lock.Unlock()
|
||
|
|
||
|
engine.Startup()
|
||
|
},
|
||
|
}
|
||
|
for _, vdr := range beacons.List() {
|
||
|
awaiting.Requested.Add(vdr.ID())
|
||
|
}
|
||
|
awaiting.NumRequired = (3*awaiting.Requested.Len() + 3) / 4 // 75% must be connected to
|
||
|
m.awaiter.AwaitConnections(awaiting)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Shutdown stops all the chains
|
||
|
func (m *manager) Shutdown() { m.chainRouter.Shutdown() }
|
||
|
|
||
|
// LookupVM returns the ID of the VM associated with an alias
|
||
|
func (m *manager) LookupVM(alias string) (ids.ID, error) { return m.vmManager.Lookup(alias) }
|
||
|
|
||
|
// Notify registrants [those who want to know about the creation of chains]
|
||
|
// that the specified chain has been created
|
||
|
func (m *manager) notifyRegistrants(ctx *snow.Context, vm interface{}) {
|
||
|
for _, registrant := range m.registrants {
|
||
|
registrant.RegisterChain(ctx, vm)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Returns:
|
||
|
// 1) the alias that already exists, or the empty string if there is none
|
||
|
// 2) true iff there exists a chain such that the chain has an alias in [aliases]
|
||
|
func (m *manager) isChainWithAlias(aliases ...string) (string, bool) {
|
||
|
for _, alias := range aliases {
|
||
|
if _, err := m.Lookup(alias); err == nil {
|
||
|
return alias, true
|
||
|
}
|
||
|
}
|
||
|
return "", false
|
||
|
}
|