gecko/snow/engine/avalanche/transitive.go

513 lines
15 KiB
Go
Raw Normal View History

2020-03-10 12:20:34 -07:00
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package avalanche
import (
"time"
2020-03-10 12:20:34 -07:00
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/network"
2020-03-10 12:20:34 -07:00
"github.com/ava-labs/gecko/snow"
"github.com/ava-labs/gecko/snow/choices"
2020-03-10 12:20:34 -07:00
"github.com/ava-labs/gecko/snow/consensus/avalanche"
"github.com/ava-labs/gecko/snow/consensus/snowstorm"
"github.com/ava-labs/gecko/snow/engine/common"
"github.com/ava-labs/gecko/snow/events"
"github.com/ava-labs/gecko/utils/formatting"
"github.com/ava-labs/gecko/utils/random"
2020-05-28 20:48:08 -07:00
"github.com/ava-labs/gecko/utils/wrappers"
2020-03-10 12:20:34 -07:00
)
const (
// TODO define this constant in one place rather than here and in snowman
// Max containers size in a MultiPut message
maxContainersLen = int(4 * network.DefaultMaxMessageSize / 5)
)
2020-03-10 12:20:34 -07:00
// Transitive implements the Engine interface by attempting to fetch all
// transitive dependencies.
type Transitive struct {
Config
bootstrapper
polls polls // track people I have asked for their preference
// vtxReqs prevents asking validators for the same vertex
vtxReqs common.Requests
2020-03-10 12:20:34 -07:00
// missingTxs tracks transaction that are missing
missingTxs, pending ids.Set
2020-03-10 12:20:34 -07:00
// vtxBlocked tracks operations that are blocked on vertices
// txBlocked tracks operations that are blocked on transactions
vtxBlocked, txBlocked events.Blocker
bootstrapped bool
2020-05-28 20:48:08 -07:00
errs wrappers.Errs
2020-03-10 12:20:34 -07:00
}
// Initialize implements the Engine interface
2020-05-28 20:48:08 -07:00
func (t *Transitive) Initialize(config Config) error {
config.Context.Log.Info("Initializing consensus engine")
2020-03-10 12:20:34 -07:00
t.Config = config
t.metrics.Initialize(config.Context.Log, config.Params.Namespace, config.Params.Metrics)
t.onFinished = t.finishBootstrapping
t.polls.log = config.Context.Log
t.polls.numPolls = t.numPolls
t.polls.m = make(map[uint32]poll)
2020-05-28 20:48:08 -07:00
return t.bootstrapper.Initialize(config.BootstrapConfig)
2020-03-10 12:20:34 -07:00
}
2020-05-28 20:48:08 -07:00
func (t *Transitive) finishBootstrapping() error {
2020-03-10 12:20:34 -07:00
// Load the vertices that were last saved as the accepted frontier
frontier := []avalanche.Vertex(nil)
for _, vtxID := range t.Config.State.Edge() {
if vtx, err := t.Config.State.GetVertex(vtxID); err == nil {
frontier = append(frontier, vtx)
} else {
t.Config.Context.Log.Error("vertex %s failed to be loaded from the frontier with %s", vtxID, err)
2020-03-10 12:20:34 -07:00
}
}
t.Consensus.Initialize(t.Config.Context, t.Params, frontier)
t.bootstrapped = true
t.Config.Context.Log.Info("bootstrapping finished with %d vertices in the accepted frontier", len(frontier))
2020-05-28 20:48:08 -07:00
return nil
2020-03-10 12:20:34 -07:00
}
2020-05-03 23:32:10 -07:00
// Gossip implements the Engine interface
2020-05-28 20:48:08 -07:00
func (t *Transitive) Gossip() error {
2020-05-03 23:32:10 -07:00
edge := t.Config.State.Edge()
if len(edge) == 0 {
t.Config.Context.Log.Verbo("dropping gossip request as no vertices have been accepted")
2020-05-28 20:48:08 -07:00
return nil
2020-05-03 23:32:10 -07:00
}
sampler := random.Uniform{N: len(edge)}
vtxID := edge[sampler.Sample()]
vtx, err := t.Config.State.GetVertex(vtxID)
if err != nil {
t.Config.Context.Log.Warn("dropping gossip request as %s couldn't be loaded due to: %s", vtxID, err)
2020-05-28 20:48:08 -07:00
return nil
2020-05-03 23:32:10 -07:00
}
t.Config.Context.Log.Verbo("gossiping %s as accepted to the network", vtxID)
2020-05-03 23:32:10 -07:00
t.Config.Sender.Gossip(vtxID, vtx.Bytes())
2020-05-28 20:48:08 -07:00
return nil
2020-05-03 23:32:10 -07:00
}
2020-03-10 12:20:34 -07:00
// Shutdown implements the Engine interface
2020-05-28 20:48:08 -07:00
func (t *Transitive) Shutdown() error {
t.Config.Context.Log.Info("shutting down consensus engine")
2020-05-28 20:48:08 -07:00
return t.Config.VM.Shutdown()
2020-03-10 12:20:34 -07:00
}
// Context implements the Engine interface
func (t *Transitive) Context() *snow.Context { return t.Config.Context }
// Get implements the Engine interface
2020-05-28 20:48:08 -07:00
func (t *Transitive) Get(vdr ids.ShortID, requestID uint32, vtxID ids.ID) error {
2020-03-10 12:20:34 -07:00
// If this engine has access to the requested vertex, provide it
if vtx, err := t.Config.State.GetVertex(vtxID); err == nil {
t.Config.Sender.Put(vdr, requestID, vtxID, vtx.Bytes())
}
2020-05-28 20:48:08 -07:00
return nil
2020-03-10 12:20:34 -07:00
}
// GetAncestors implements the Engine interface
func (t *Transitive) GetAncestors(vdr ids.ShortID, requestID uint32, vtxID ids.ID) error {
startTime := time.Now()
t.Config.Context.Log.Verbo("GetAncestors(%s, %d, %s) called", vdr, requestID, vtxID)
vertex, err := t.Config.State.GetVertex(vtxID)
if err != nil || vertex.Status() == choices.Unknown {
t.Config.Context.Log.Verbo("dropping getAncestors")
return nil // Don't have the requested vertex. Drop message.
}
queue := make([]avalanche.Vertex, 1, common.MaxContainersPerMultiPut) // for BFS
queue[0] = vertex
ancestorsBytesLen := 0 // length, in bytes, of vertex and its ancestors
ancestorsBytes := make([][]byte, 0, common.MaxContainersPerMultiPut) // vertex and its ancestors in BFS order
visited := ids.Set{} // IDs of vertices that have been in queue before
visited.Add(vertex.ID())
for len(ancestorsBytes) < common.MaxContainersPerMultiPut && len(queue) > 0 && time.Since(startTime) < common.MaxTimeFetchingAncestors {
var vtx avalanche.Vertex
vtx, queue = queue[0], queue[1:] // pop
vtxBytes := vtx.Bytes()
2020-06-04 09:48:39 -07:00
// Ensure response size isn't too large. Include wrappers.IntLen because the size of the message
// is included with each container, and the size is repr. by an int.
if newLen := wrappers.IntLen + ancestorsBytesLen + len(vtxBytes); newLen < maxContainersLen {
ancestorsBytes = append(ancestorsBytes, vtxBytes)
ancestorsBytesLen = newLen
} else { // reached maximum response size
break
}
for _, parent := range vtx.Parents() {
if parent.Status() == choices.Unknown { // Don't have this vertex;ignore
continue
}
2020-06-04 06:54:04 -07:00
if parentID := parent.ID(); !visited.Contains(parentID) { // If already visited, ignore
queue = append(queue, parent)
visited.Add(parentID)
}
}
}
t.Config.Sender.MultiPut(vdr, requestID, ancestorsBytes)
return nil
}
2020-03-10 12:20:34 -07:00
// Put implements the Engine interface
2020-05-28 20:48:08 -07:00
func (t *Transitive) Put(vdr ids.ShortID, requestID uint32, vtxID ids.ID, vtxBytes []byte) error {
t.Config.Context.Log.Verbo("Put(%s, %d, %s) called", vdr, requestID, vtxID)
2020-03-10 12:20:34 -07:00
if !t.bootstrapped { // Bootstrapping unfinished --> didn't call Get --> this message is invalid
t.Config.Context.Log.Debug("dropping Put(%s, %d, %s) due to bootstrapping", vdr, requestID, vtxID)
return nil
2020-03-10 12:20:34 -07:00
}
vtx, err := t.Config.State.ParseVertex(vtxBytes)
if err != nil {
t.Config.Context.Log.Debug("failed to parse vertex %s due to: %s", vtxID, err)
t.Config.Context.Log.Verbo("vertex:\n%s", formatting.DumpBytes{Bytes: vtxBytes})
2020-05-28 20:48:08 -07:00
return t.GetFailed(vdr, requestID)
2020-03-10 12:20:34 -07:00
}
2020-05-28 20:48:08 -07:00
_, err = t.insertFrom(vdr, vtx)
return err
2020-03-10 12:20:34 -07:00
}
// GetFailed implements the Engine interface
2020-05-28 20:48:08 -07:00
func (t *Transitive) GetFailed(vdr ids.ShortID, requestID uint32) error {
if !t.bootstrapped { // Bootstrapping unfinished --> didn't call Get --> this message is invalid
t.Config.Context.Log.Debug("dropping GetFailed(%s, %d) due to bootstrapping", vdr, requestID)
return nil
}
vtxID, ok := t.vtxReqs.Remove(vdr, requestID)
if !ok {
2020-06-04 07:00:29 -07:00
t.Config.Context.Log.Debug("GetFailed(%s, %d) called without having sent corresponding Get", vdr, requestID)
2020-05-28 20:48:08 -07:00
return nil
2020-03-10 12:20:34 -07:00
}
t.vtxBlocked.Abandon(vtxID)
if t.vtxReqs.Len() == 0 {
for _, txID := range t.missingTxs.List() {
t.txBlocked.Abandon(txID)
}
t.missingTxs.Clear()
}
// Track performance statistics
t.numVtxRequests.Set(float64(t.vtxReqs.Len()))
t.numTxRequests.Set(float64(t.missingTxs.Len()))
2020-05-28 20:48:08 -07:00
return t.errs.Err
2020-03-10 12:20:34 -07:00
}
// PullQuery implements the Engine interface
2020-05-28 20:48:08 -07:00
func (t *Transitive) PullQuery(vdr ids.ShortID, requestID uint32, vtxID ids.ID) error {
2020-03-10 12:20:34 -07:00
if !t.bootstrapped {
t.Config.Context.Log.Debug("dropping PullQuery(%s, %d, %s) due to bootstrapping", vdr, requestID, vtxID)
2020-05-28 20:48:08 -07:00
return nil
2020-03-10 12:20:34 -07:00
}
c := &convincer{
consensus: t.Consensus,
sender: t.Config.Sender,
vdr: vdr,
requestID: requestID,
2020-05-28 20:48:08 -07:00
errs: &t.errs,
}
added, err := t.reinsertFrom(vdr, vtxID)
if err != nil {
return err
2020-03-10 12:20:34 -07:00
}
2020-05-28 20:48:08 -07:00
if !added {
2020-03-10 12:20:34 -07:00
c.deps.Add(vtxID)
}
t.vtxBlocked.Register(c)
2020-05-28 20:48:08 -07:00
return t.errs.Err
2020-03-10 12:20:34 -07:00
}
// PushQuery implements the Engine interface
2020-05-28 20:48:08 -07:00
func (t *Transitive) PushQuery(vdr ids.ShortID, requestID uint32, vtxID ids.ID, vtxBytes []byte) error {
2020-03-10 12:20:34 -07:00
if !t.bootstrapped {
t.Config.Context.Log.Debug("dropping PushQuery(%s, %d, %s) due to bootstrapping", vdr, requestID, vtxID)
2020-05-28 20:48:08 -07:00
return nil
2020-03-10 12:20:34 -07:00
}
vtx, err := t.Config.State.ParseVertex(vtxBytes)
if err != nil {
t.Config.Context.Log.Debug("failed to parse vertex %s due to: %s", vtxID, err)
t.Config.Context.Log.Verbo("vertex:\n%s", formatting.DumpBytes{Bytes: vtxBytes})
2020-05-28 20:48:08 -07:00
return nil
}
if _, err := t.insertFrom(vdr, vtx); err != nil {
return err
}
2020-05-28 20:48:08 -07:00
return t.PullQuery(vdr, requestID, vtx.ID())
2020-03-10 12:20:34 -07:00
}
// Chits implements the Engine interface
2020-05-28 20:48:08 -07:00
func (t *Transitive) Chits(vdr ids.ShortID, requestID uint32, votes ids.Set) error {
2020-03-10 12:20:34 -07:00
if !t.bootstrapped {
t.Config.Context.Log.Debug("dropping Chits(%s, %d) due to bootstrapping", vdr, requestID)
2020-05-28 20:48:08 -07:00
return nil
2020-03-10 12:20:34 -07:00
}
v := &voter{
t: t,
vdr: vdr,
requestID: requestID,
response: votes,
}
voteList := votes.List()
for _, vote := range voteList {
2020-05-28 20:48:08 -07:00
if added, err := t.reinsertFrom(vdr, vote); err != nil {
return err
} else if !added {
2020-03-10 12:20:34 -07:00
v.deps.Add(vote)
}
}
t.vtxBlocked.Register(v)
2020-05-28 20:48:08 -07:00
return t.errs.Err
2020-03-10 12:20:34 -07:00
}
// QueryFailed implements the Engine interface
2020-05-28 20:48:08 -07:00
func (t *Transitive) QueryFailed(vdr ids.ShortID, requestID uint32) error {
return t.Chits(vdr, requestID, ids.Set{})
2020-03-10 12:20:34 -07:00
}
// Notify implements the Engine interface
2020-05-28 20:48:08 -07:00
func (t *Transitive) Notify(msg common.Message) error {
2020-03-10 12:20:34 -07:00
if !t.bootstrapped {
t.Config.Context.Log.Debug("dropping Notify due to bootstrapping")
2020-05-28 20:48:08 -07:00
return nil
2020-03-10 12:20:34 -07:00
}
switch msg {
case common.PendingTxs:
txs := t.Config.VM.PendingTxs()
2020-05-28 20:48:08 -07:00
return t.batch(txs, false /*=force*/, false /*=empty*/)
2020-03-10 12:20:34 -07:00
}
2020-05-28 20:48:08 -07:00
return nil
2020-03-10 12:20:34 -07:00
}
2020-05-28 20:48:08 -07:00
func (t *Transitive) repoll() error {
if len(t.polls.m) >= t.Params.ConcurrentRepolls || t.errs.Errored() {
return nil
}
2020-03-10 12:20:34 -07:00
txs := t.Config.VM.PendingTxs()
2020-05-28 20:48:08 -07:00
if err := t.batch(txs, false /*=force*/, true /*=empty*/); err != nil {
return err
}
for i := len(t.polls.m); i < t.Params.ConcurrentRepolls; i++ {
2020-05-28 20:48:08 -07:00
if err := t.batch(nil, false /*=force*/, true /*=empty*/); err != nil {
return err
}
}
2020-05-28 20:48:08 -07:00
return nil
2020-03-10 12:20:34 -07:00
}
2020-05-28 20:48:08 -07:00
func (t *Transitive) reinsertFrom(vdr ids.ShortID, vtxID ids.ID) (bool, error) {
2020-03-10 12:20:34 -07:00
vtx, err := t.Config.State.GetVertex(vtxID)
if err != nil {
t.sendRequest(vdr, vtxID)
2020-05-28 20:48:08 -07:00
return false, nil
2020-03-10 12:20:34 -07:00
}
return t.insertFrom(vdr, vtx)
}
2020-05-28 20:48:08 -07:00
func (t *Transitive) insertFrom(vdr ids.ShortID, vtx avalanche.Vertex) (bool, error) {
2020-03-10 12:20:34 -07:00
issued := true
vts := []avalanche.Vertex{vtx}
for len(vts) > 0 {
vtx := vts[0]
vts = vts[1:]
if t.Consensus.VertexIssued(vtx) {
continue
}
if t.pending.Contains(vtx.ID()) {
issued = false
continue
}
for _, parent := range vtx.Parents() {
if !parent.Status().Fetched() {
t.sendRequest(vdr, parent.ID())
issued = false
} else {
vts = append(vts, parent)
}
}
2020-05-28 20:48:08 -07:00
if err := t.insert(vtx); err != nil {
return false, err
}
2020-03-10 12:20:34 -07:00
}
2020-05-28 20:48:08 -07:00
return issued, nil
2020-03-10 12:20:34 -07:00
}
2020-05-28 20:48:08 -07:00
func (t *Transitive) insert(vtx avalanche.Vertex) error {
2020-03-10 12:20:34 -07:00
vtxID := vtx.ID()
t.pending.Add(vtxID)
t.vtxReqs.RemoveAny(vtxID)
2020-03-10 12:20:34 -07:00
i := &issuer{
t: t,
vtx: vtx,
}
for _, parent := range vtx.Parents() {
if !t.Consensus.VertexIssued(parent) {
i.vtxDeps.Add(parent.ID())
}
}
txs := vtx.Txs()
txIDs := ids.Set{}
for _, tx := range txs {
txIDs.Add(tx.ID())
}
for _, tx := range txs {
for _, dep := range tx.Dependencies() {
depID := dep.ID()
if !txIDs.Contains(depID) && !t.Consensus.TxIssued(dep) {
t.missingTxs.Add(depID)
i.txDeps.Add(depID)
}
}
}
t.Config.Context.Log.Verbo("vertex %s is blocking on %d vertices and %d transactions", vtxID, i.vtxDeps.Len(), i.txDeps.Len())
2020-03-10 12:20:34 -07:00
t.vtxBlocked.Register(&vtxIssuer{i: i})
t.txBlocked.Register(&txIssuer{i: i})
if t.vtxReqs.Len() == 0 {
for _, txID := range t.missingTxs.List() {
t.txBlocked.Abandon(txID)
}
t.missingTxs.Clear()
}
// Track performance statistics
t.numVtxRequests.Set(float64(t.vtxReqs.Len()))
t.numTxRequests.Set(float64(t.missingTxs.Len()))
2020-05-20 08:37:01 -07:00
t.numPendingVtx.Set(float64(t.pending.Len()))
2020-05-28 20:48:08 -07:00
return t.errs.Err
2020-03-10 12:20:34 -07:00
}
2020-05-28 20:48:08 -07:00
func (t *Transitive) batch(txs []snowstorm.Tx, force, empty bool) error {
2020-03-10 12:20:34 -07:00
batch := []snowstorm.Tx(nil)
issuedTxs := ids.Set{}
consumed := ids.Set{}
issued := false
orphans := t.Consensus.Orphans()
2020-03-10 12:20:34 -07:00
for _, tx := range txs {
inputs := tx.InputIDs()
overlaps := consumed.Overlaps(inputs)
if len(batch) >= t.Params.BatchSize || (force && overlaps) {
t.issueBatch(batch)
batch = nil
consumed.Clear()
issued = true
overlaps = false
}
if txID := tx.ID(); !overlaps && // should never allow conflicting txs in the same vertex
!issuedTxs.Contains(txID) && // shouldn't issue duplicated transactions to the same vertex
(force || t.Consensus.IsVirtuous(tx)) && // force allows for a conflict to be issued
(!t.Consensus.TxIssued(tx) || orphans.Contains(txID)) { // should only reissued orphaned txs
2020-03-10 12:20:34 -07:00
batch = append(batch, tx)
issuedTxs.Add(txID)
consumed.Union(inputs)
}
}
2020-04-23 22:53:02 -07:00
if len(batch) > 0 {
2020-05-28 20:48:08 -07:00
return t.issueBatch(batch)
2020-04-23 22:53:02 -07:00
} else if empty && !issued {
t.issueRepoll()
}
2020-05-28 20:48:08 -07:00
return nil
2020-04-23 22:53:02 -07:00
}
func (t *Transitive) issueRepoll() {
preferredIDs := t.Consensus.Preferences().List()
numPreferredIDs := len(preferredIDs)
if numPreferredIDs == 0 {
t.Config.Context.Log.Error("re-query attempt was dropped due to no pending vertices")
2020-04-23 22:53:02 -07:00
return
}
sampler := random.Uniform{N: len(preferredIDs)}
vtxID := preferredIDs[sampler.Sample()]
p := t.Consensus.Parameters()
vdrs := t.Config.Validators.Sample(p.K) // Validators to sample
vdrSet := ids.ShortSet{} // Validators to sample repr. as a set
for _, vdr := range vdrs {
vdrSet.Add(vdr.ID())
}
t.RequestID++
if numVdrs := len(vdrs); numVdrs == p.K && t.polls.Add(t.RequestID, vdrSet.Len()) {
t.Config.Sender.PullQuery(vdrSet, t.RequestID, vtxID)
} else if numVdrs < p.K {
t.Config.Context.Log.Error("re-query for %s was dropped due to an insufficient number of validators", vtxID)
2020-03-10 12:20:34 -07:00
}
}
2020-05-28 20:48:08 -07:00
func (t *Transitive) issueBatch(txs []snowstorm.Tx) error {
t.Config.Context.Log.Verbo("batching %d transactions into a new vertex", len(txs))
2020-03-10 12:20:34 -07:00
virtuousIDs := t.Consensus.Virtuous().List()
sampler := random.Uniform{N: len(virtuousIDs)}
parentIDs := ids.Set{}
for i := 0; i < t.Params.Parents && sampler.CanSample(); i++ {
parentIDs.Add(virtuousIDs[sampler.Sample()])
}
2020-05-28 20:48:08 -07:00
vtx, err := t.Config.State.BuildVertex(parentIDs, txs)
if err != nil {
t.Config.Context.Log.Warn("error building new vertex with %d parents and %d transactions", len(parentIDs), len(txs))
2020-05-28 20:48:08 -07:00
return nil
2020-03-10 12:20:34 -07:00
}
2020-05-28 20:48:08 -07:00
return t.insert(vtx)
2020-03-10 12:20:34 -07:00
}
func (t *Transitive) sendRequest(vdr ids.ShortID, vtxID ids.ID) {
if t.vtxReqs.Contains(vtxID) {
t.Config.Context.Log.Debug("not requesting a vertex because we have recently sent a request")
2020-03-10 12:20:34 -07:00
return
}
t.RequestID++
t.vtxReqs.Add(vdr, t.RequestID, vtxID)
2020-03-10 12:20:34 -07:00
t.Config.Sender.Get(vdr, t.RequestID, vtxID)
t.numVtxRequests.Set(float64(t.vtxReqs.Len())) // Tracks performance statistics
2020-03-10 12:20:34 -07:00
}