This commit is contained in:
StephenButtolph 2020-05-08 20:36:05 -04:00
commit 1b722ae5c0
35 changed files with 509 additions and 120 deletions

View File

@ -40,6 +40,7 @@ import (
const (
defaultChannelSize = 1000
requestTimeout = 2 * time.Second
gossipFrequency = 10 * time.Second
)
// Manager manages the chains running on this node.
@ -146,7 +147,7 @@ func New(
timeoutManager.Initialize(requestTimeout)
go log.RecoverAndPanic(timeoutManager.Dispatch)
router.Initialize(log, &timeoutManager)
router.Initialize(log, &timeoutManager, gossipFrequency)
m := &manager{
stakingEnabled: stakingEnabled,

View File

@ -12,6 +12,7 @@ import (
"github.com/ava-labs/gecko/database"
"github.com/ava-labs/gecko/database/nodb"
"github.com/ava-labs/gecko/utils"
"github.com/ava-labs/gecko/utils/hashing"
"github.com/ava-labs/gecko/vms/components/codec"
)
@ -174,7 +175,7 @@ type batch struct {
}
func (b *batch) Put(key, value []byte) error {
b.writes = append(b.writes, keyValue{copyBytes(key), copyBytes(value), false})
b.writes = append(b.writes, keyValue{utils.CopyBytes(key), utils.CopyBytes(value), false})
encValue, err := b.db.encrypt(value)
if err != nil {
return err
@ -183,7 +184,7 @@ func (b *batch) Put(key, value []byte) error {
}
func (b *batch) Delete(key []byte) error {
b.writes = append(b.writes, keyValue{copyBytes(key), nil, true})
b.writes = append(b.writes, keyValue{utils.CopyBytes(key), nil, true})
return b.Batch.Delete(key)
}
@ -251,12 +252,6 @@ func (it *iterator) Error() error {
func (it *iterator) Value() []byte { return it.val }
func copyBytes(bytes []byte) []byte {
copiedBytes := make([]byte, len(bytes))
copy(copiedBytes, bytes)
return copiedBytes
}
type encryptedValue struct {
Ciphertext []byte `serialize:"true"`
Nonce []byte `serialize:"true"`

View File

@ -17,46 +17,42 @@ package database
// iterator until exhaustion. An iterator is not safe for concurrent use, but it
// is safe to use multiple iterators concurrently.
type Iterator interface {
// Next moves the iterator to the next key/value pair. It returns whether the
// iterator is exhausted.
// Next moves the iterator to the next key/value pair. It returns whether
// the iterator is exhausted.
Next() bool
// Error returns any accumulated error. Exhausting all the key/value pairs
// is not considered to be an error.
Error() error
// Key returns the key of the current key/value pair, or nil if done. The caller
// should not modify the contents of the returned slice, and its contents may
// change on the next call to Next.
// Key returns the key of the current key/value pair, or nil if done.
Key() []byte
// Value returns the value of the current key/value pair, or nil if done. The
// caller should not modify the contents of the returned slice, and its contents
// may change on the next call to Next.
// Value returns the value of the current key/value pair, or nil if done.
Value() []byte
// Release releases associated resources. Release should always succeed and can
// be called multiple times without causing error.
// Release releases associated resources. Release should always succeed and
// can be called multiple times without causing error.
Release()
}
// Iteratee wraps the NewIterator methods of a backing data store.
type Iteratee interface {
// NewIterator creates a binary-alphabetical iterator over the entire keyspace
// contained within the key-value database.
// NewIterator creates a binary-alphabetical iterator over the entire
// keyspace contained within the key-value database.
NewIterator() Iterator
// NewIteratorWithStart creates a binary-alphabetical iterator over a subset of
// database content starting at a particular initial key (or after, if it does
// not exist).
// NewIteratorWithStart creates a binary-alphabetical iterator over a subset
// of database content starting at a particular initial key (or after, if it
// does not exist).
NewIteratorWithStart(start []byte) Iterator
// NewIteratorWithPrefix creates a binary-alphabetical iterator over a subset
// of database content with a particular key prefix.
// NewIteratorWithPrefix creates a binary-alphabetical iterator over a
// subset of database content with a particular key prefix.
NewIteratorWithPrefix(prefix []byte) Iterator
// NewIteratorWithStartAndPrefix creates a binary-alphabetical iterator over a
// subset of database content with a particular key prefix starting at a
// NewIteratorWithStartAndPrefix creates a binary-alphabetical iterator over
// a subset of database content with a particular key prefix starting at a
// specified key.
NewIteratorWithStartAndPrefix(start, prefix []byte) Iterator
}

View File

@ -7,6 +7,7 @@ import (
"bytes"
"github.com/ava-labs/gecko/database"
"github.com/ava-labs/gecko/utils"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/filter"
@ -208,7 +209,14 @@ func (r *replayer) Delete(key []byte) {
type iter struct{ iterator.Iterator }
func (i *iter) Error() error { return updateError(i.Iterator.Error()) }
// Error implements the Iterator interface
func (it *iter) Error() error { return updateError(it.Iterator.Error()) }
// Key implements the Iterator interface
func (it *iter) Key() []byte { return utils.CopyBytes(it.Iterator.Key()) }
// Value implements the Iterator interface
func (it *iter) Value() []byte { return utils.CopyBytes(it.Iterator.Value()) }
func updateError(err error) error {
switch err {

View File

@ -10,6 +10,7 @@ import (
"github.com/ava-labs/gecko/database"
"github.com/ava-labs/gecko/database/nodb"
"github.com/ava-labs/gecko/utils"
)
// DefaultSize is the default initial size of the memory database
@ -62,7 +63,7 @@ func (db *Database) Get(key []byte) ([]byte, error) {
return nil, database.ErrClosed
}
if entry, ok := db.db[string(key)]; ok {
return copyBytes(entry), nil
return utils.CopyBytes(entry), nil
}
return nil, database.ErrNotFound
}
@ -75,7 +76,7 @@ func (db *Database) Put(key []byte, value []byte) error {
if db.db == nil {
return database.ErrClosed
}
db.db[string(key)] = copyBytes(value)
db.db[string(key)] = utils.CopyBytes(value)
return nil
}
@ -154,13 +155,13 @@ type batch struct {
}
func (b *batch) Put(key, value []byte) error {
b.writes = append(b.writes, keyValue{copyBytes(key), copyBytes(value), false})
b.writes = append(b.writes, keyValue{utils.CopyBytes(key), utils.CopyBytes(value), false})
b.size += len(value)
return nil
}
func (b *batch) Delete(key []byte) error {
b.writes = append(b.writes, keyValue{copyBytes(key), nil, true})
b.writes = append(b.writes, keyValue{utils.CopyBytes(key), nil, true})
b.size++
return nil
}
@ -253,9 +254,3 @@ func (it *iterator) Value() []byte {
// Release implements the Iterator interface
func (it *iterator) Release() { it.keys = nil; it.values = nil }
func copyBytes(bytes []byte) []byte {
copiedBytes := make([]byte, len(bytes))
copy(copiedBytes, bytes)
return copiedBytes
}

View File

@ -8,6 +8,7 @@ import (
"github.com/ava-labs/gecko/database"
"github.com/ava-labs/gecko/database/nodb"
"github.com/ava-labs/gecko/utils"
"github.com/ava-labs/gecko/utils/hashing"
)
@ -174,13 +175,13 @@ type batch struct {
// Put implements the Batch interface
func (b *batch) Put(key, value []byte) error {
b.writes = append(b.writes, keyValue{copyBytes(key), copyBytes(value), false})
b.writes = append(b.writes, keyValue{utils.CopyBytes(key), utils.CopyBytes(value), false})
return b.Batch.Put(b.db.prefix(key), value)
}
// Delete implements the Batch interface
func (b *batch) Delete(key []byte) error {
b.writes = append(b.writes, keyValue{copyBytes(key), nil, true})
b.writes = append(b.writes, keyValue{utils.CopyBytes(key), nil, true})
return b.Batch.Delete(b.db.prefix(key))
}
@ -229,9 +230,3 @@ func (it *iterator) Key() []byte {
}
return key
}
func copyBytes(bytes []byte) []byte {
copiedBytes := make([]byte, len(bytes))
copy(copiedBytes, bytes)
return copiedBytes
}

View File

@ -11,6 +11,7 @@ import (
"github.com/ava-labs/gecko/database"
"github.com/ava-labs/gecko/database/nodb"
"github.com/ava-labs/gecko/database/rpcdb/rpcdbproto"
"github.com/ava-labs/gecko/utils"
)
var (
@ -137,13 +138,13 @@ type batch struct {
}
func (b *batch) Put(key, value []byte) error {
b.writes = append(b.writes, keyValue{copyBytes(key), copyBytes(value), false})
b.writes = append(b.writes, keyValue{utils.CopyBytes(key), utils.CopyBytes(value), false})
b.size += len(value)
return nil
}
func (b *batch) Delete(key []byte) error {
b.writes = append(b.writes, keyValue{copyBytes(key), nil, true})
b.writes = append(b.writes, keyValue{utils.CopyBytes(key), nil, true})
b.size++
return nil
}
@ -246,12 +247,6 @@ func (it *iterator) Release() {
})
}
func copyBytes(bytes []byte) []byte {
copiedBytes := make([]byte, len(bytes))
copy(copiedBytes, bytes)
return copiedBytes
}
func updateError(err error) error {
if err == nil {
return nil

View File

@ -24,6 +24,7 @@ var (
TestIteratorStart,
TestIteratorPrefix,
TestIteratorStartPrefix,
TestIteratorMemorySafety,
TestIteratorClosed,
TestStatNoPanic,
TestCompactNoPanic,
@ -622,6 +623,63 @@ func TestIteratorStartPrefix(t *testing.T, db Database) {
}
}
// TestIteratorMemorySafety ...
func TestIteratorMemorySafety(t *testing.T, db Database) {
key1 := []byte("hello1")
value1 := []byte("world1")
key2 := []byte("z")
value2 := []byte("world2")
key3 := []byte("hello3")
value3 := []byte("world3")
if err := db.Put(key1, value1); err != nil {
t.Fatalf("Unexpected error on batch.Put: %s", err)
} else if err := db.Put(key2, value2); err != nil {
t.Fatalf("Unexpected error on batch.Put: %s", err)
} else if err := db.Put(key3, value3); err != nil {
t.Fatalf("Unexpected error on batch.Put: %s", err)
}
iterator := db.NewIterator()
if iterator == nil {
t.Fatalf("db.NewIterator returned nil")
}
defer iterator.Release()
keys := [][]byte{}
values := [][]byte{}
for iterator.Next() {
keys = append(keys, iterator.Key())
values = append(values, iterator.Value())
}
expectedKeys := [][]byte{
key1,
key3,
key2,
}
expectedValues := [][]byte{
value1,
value3,
value2,
}
for i, key := range keys {
value := values[i]
expectedKey := expectedKeys[i]
expectedValue := expectedValues[i]
if !bytes.Equal(key, expectedKey) {
t.Fatalf("Wrong key")
}
if !bytes.Equal(value, expectedValue) {
t.Fatalf("Wrong key")
}
}
}
// TestIteratorClosed ...
func TestIteratorClosed(t *testing.T, db Database) {
key1 := []byte("hello1")

View File

@ -11,6 +11,7 @@ import (
"github.com/ava-labs/gecko/database"
"github.com/ava-labs/gecko/database/memdb"
"github.com/ava-labs/gecko/database/nodb"
"github.com/ava-labs/gecko/utils"
)
// Database implements the Database interface by living on top of another
@ -61,7 +62,7 @@ func (db *Database) Get(key []byte) ([]byte, error) {
if val.delete {
return nil, database.ErrNotFound
}
return copyBytes(val.value), nil
return utils.CopyBytes(val.value), nil
}
return db.db.Get(key)
}
@ -262,14 +263,14 @@ type batch struct {
// Put implements the Database interface
func (b *batch) Put(key, value []byte) error {
b.writes = append(b.writes, keyValue{copyBytes(key), copyBytes(value), false})
b.writes = append(b.writes, keyValue{utils.CopyBytes(key), utils.CopyBytes(value), false})
b.size += len(value)
return nil
}
// Delete implements the Database interface
func (b *batch) Delete(key []byte) error {
b.writes = append(b.writes, keyValue{copyBytes(key), nil, true})
b.writes = append(b.writes, keyValue{utils.CopyBytes(key), nil, true})
b.size++
return nil
}
@ -414,9 +415,3 @@ func (it *iterator) Release() {
it.values = nil
it.Iterator.Release()
}
func copyBytes(bytes []byte) []byte {
copiedBytes := make([]byte, len(bytes))
copy(copiedBytes, bytes)
return copiedBytes
}

View File

@ -18,6 +18,7 @@ import "C"
import (
"errors"
"fmt"
"math"
"unsafe"
"github.com/prometheus/client_golang/prometheus"
@ -29,9 +30,15 @@ import (
"github.com/ava-labs/gecko/snow/validators"
"github.com/ava-labs/gecko/utils/formatting"
"github.com/ava-labs/gecko/utils/logging"
"github.com/ava-labs/gecko/utils/random"
"github.com/ava-labs/gecko/utils/timer"
)
// GossipSize is the maximum number of peers to gossip a container to
const (
GossipSize = 50
)
var (
// VotingNet implements the SenderExternal interface.
VotingNet = Voting{}
@ -89,34 +96,7 @@ func (s *Voting) Shutdown() { s.executor.Stop() }
// Accept is called after every consensus decision
func (s *Voting) Accept(chainID, containerID ids.ID, container []byte) error {
peers := []salticidae.PeerID(nil)
allPeers, allIDs, _ := s.conns.Conns()
for i, id := range allIDs {
if !s.vdrs.Contains(id) {
peers = append(peers, allPeers[i])
}
}
build := Builder{}
msg, err := build.Put(chainID, 0, containerID, container)
if err != nil {
return fmt.Errorf("Attempted to pack too large of a Put message.\nContainer length: %d: %w", len(container), err)
}
s.log.Verbo("Sending a Put message to non-validators."+
"\nNumber of Non-Validators: %d"+
"\nChain: %s"+
"\nContainer ID: %s"+
"\nContainer:\n%s",
len(peers),
chainID,
containerID,
formatting.DumpBytes{Bytes: container},
)
s.send(msg, peers...)
s.numPutSent.Add(float64(len(peers)))
return nil
return s.gossip(chainID, containerID, container)
}
// GetAcceptedFrontier implements the Sender interface.
@ -412,6 +392,13 @@ func (s *Voting) Chits(validatorID ids.ShortID, chainID ids.ID, requestID uint32
s.numChitsSent.Inc()
}
// Gossip attempts to gossip the container to the network
func (s *Voting) Gossip(chainID, containerID ids.ID, container []byte) {
if err := s.gossip(chainID, containerID, container); err != nil {
s.log.Error("Error gossiping container %s to %s\n%s", containerID, chainID, err)
}
}
func (s *Voting) send(msg Msg, peers ...salticidae.PeerID) {
ds := msg.DataStream()
defer ds.Free()
@ -429,6 +416,41 @@ func (s *Voting) send(msg Msg, peers ...salticidae.PeerID) {
}
}
func (s *Voting) gossip(chainID, containerID ids.ID, container []byte) error {
allPeers := s.conns.PeerIDs()
numToGossip := GossipSize
if numToGossip > len(allPeers) {
numToGossip = len(allPeers)
}
peers := make([]salticidae.PeerID, numToGossip)
sampler := random.Uniform{N: len(allPeers)}
for i := range peers {
peers[i] = allPeers[sampler.Sample()]
}
build := Builder{}
msg, err := build.Put(chainID, math.MaxUint32, containerID, container)
if err != nil {
return fmt.Errorf("Attempted to pack too large of a Put message.\nContainer length: %d: %w", len(container), err)
}
s.log.Verbo("Sending a Put message to peers."+
"\nNumber of Peers: %d"+
"\nChain: %s"+
"\nContainer ID: %s"+
"\nContainer:\n%s",
len(peers),
chainID,
containerID,
formatting.DumpBytes{Bytes: container},
)
s.send(msg, peers...)
s.numPutSent.Add(float64(len(peers)))
return nil
}
// getAcceptedFrontier handles the recept of a getAcceptedFrontier container
// message for a chain
//export getAcceptedFrontier

View File

@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
@ -60,7 +61,7 @@ func newConfig(t *testing.T) (BootstrapConfig, ids.ShortID, *common.SenderTest,
handler.Initialize(engine, make(chan common.Message), 1)
timeouts.Initialize(0)
router.Initialize(ctx.Log, timeouts)
router.Initialize(ctx.Log, timeouts, time.Hour)
vtxBlocker, _ := queue.New(prefixdb.New([]byte("vtx"), db))
txBlocker, _ := queue.New(prefixdb.New([]byte("tx"), db))

View File

@ -64,6 +64,26 @@ func (t *Transitive) finishBootstrapping() {
t.bootstrapped = true
}
// Gossip implements the Engine interface
func (t *Transitive) Gossip() {
edge := t.Config.State.Edge()
if len(edge) == 0 {
t.Config.Context.Log.Debug("Dropping gossip request as no vertices have been accepted")
return
}
sampler := random.Uniform{N: len(edge)}
vtxID := edge[sampler.Sample()]
vtx, err := t.Config.State.GetVertex(vtxID)
if err != nil {
t.Config.Context.Log.Warn("Dropping gossip request as %s couldn't be loaded due to %s", vtxID, err)
return
}
t.Config.Context.Log.Debug("Gossiping %s as accepted to the network", vtxID)
t.Config.Sender.Gossip(vtxID, vtx.Bytes())
}
// Shutdown implements the Engine interface
func (t *Transitive) Shutdown() {
t.Config.Context.Log.Info("Shutting down Avalanche consensus")

View File

@ -2553,6 +2553,57 @@ func TestEnginePartiallyValidVertex(t *testing.T) {
te.insert(vtx)
}
func TestEngineGossip(t *testing.T) {
config := DefaultConfig()
sender := &common.SenderTest{}
sender.T = t
config.Sender = sender
sender.Default(true)
st := &stateTest{t: t}
config.State = st
gVtx := &Vtx{
id: GenerateID(),
status: choices.Accepted,
}
te := &Transitive{}
te.Initialize(config)
te.finishBootstrapping()
st.edge = func() []ids.ID { return []ids.ID{gVtx.ID()} }
st.getVertex = func(vtxID ids.ID) (avalanche.Vertex, error) {
switch {
case vtxID.Equals(gVtx.ID()):
return gVtx, nil
}
t.Fatal(errUnknownVertex)
return nil, errUnknownVertex
}
called := new(bool)
sender.GossipF = func(vtxID ids.ID, vtxBytes []byte) {
*called = true
switch {
case !vtxID.Equals(gVtx.ID()):
t.Fatal(errUnknownVertex)
}
switch {
case !bytes.Equal(vtxBytes, gVtx.Bytes()):
t.Fatal(errUnknownVertex)
}
}
te.Gossip()
if !*called {
t.Fatalf("Should have gossiped the vertex")
}
}
func TestEngineInvalidVertexIgnoredFromUnexpectedPeer(t *testing.T) {
config := DefaultConfig()

View File

@ -221,6 +221,9 @@ type InternalHandler interface {
// able to run the engine.
Startup()
// Gossip to the network a container on the accepted frontier
Gossip()
// Shutdown this engine.
//
// This function will be called when the environment is exiting.

View File

@ -14,6 +14,7 @@ type Sender interface {
AcceptedSender
FetchSender
QuerySender
Gossiper
}
// FrontierSender defines how a consensus engine sends frontier messages to
@ -70,3 +71,10 @@ type QuerySender interface {
// Chits sends chits to the specified validator
Chits(validatorID ids.ShortID, requestID uint32, votes ids.Set)
}
// Gossiper defines how a consensus engine gossips a container on the accepted
// frontier to other validators
type Gossiper interface {
// Gossip gossips the provided container throughout the network
Gossip(containerID ids.ID, container []byte)
}

View File

@ -15,6 +15,7 @@ type EngineTest struct {
T *testing.T
CantStartup,
CantGossip,
CantShutdown,
CantContext,
@ -38,7 +39,7 @@ type EngineTest struct {
CantQueryFailed,
CantChits bool
StartupF, ShutdownF func()
StartupF, GossipF, ShutdownF func()
ContextF func() *snow.Context
NotifyF func(Message)
GetF, PullQueryF func(validatorID ids.ShortID, requestID uint32, containerID ids.ID)
@ -51,6 +52,7 @@ type EngineTest struct {
// Default ...
func (e *EngineTest) Default(cant bool) {
e.CantStartup = cant
e.CantGossip = cant
e.CantShutdown = cant
e.CantContext = cant
@ -84,6 +86,15 @@ func (e *EngineTest) Startup() {
}
}
// Gossip ...
func (e *EngineTest) Gossip() {
if e.GossipF != nil {
e.GossipF()
} else if e.CantGossip && e.T != nil {
e.T.Fatalf("Unexpectedly called Gossip")
}
}
// Shutdown ...
func (e *EngineTest) Shutdown() {
if e.ShutdownF != nil {

View File

@ -16,7 +16,8 @@ type SenderTest struct {
CantGetAcceptedFrontier, CantAcceptedFrontier,
CantGetAccepted, CantAccepted,
CantGet, CantPut,
CantPullQuery, CantPushQuery, CantChits bool
CantPullQuery, CantPushQuery, CantChits,
CantGossip bool
GetAcceptedFrontierF func(ids.ShortSet, uint32)
AcceptedFrontierF func(ids.ShortID, uint32, ids.Set)
@ -27,6 +28,7 @@ type SenderTest struct {
PushQueryF func(ids.ShortSet, uint32, ids.ID, []byte)
PullQueryF func(ids.ShortSet, uint32, ids.ID)
ChitsF func(ids.ShortID, uint32, ids.Set)
GossipF func(ids.ID, []byte)
}
// Default set the default callable value to [cant]
@ -40,6 +42,7 @@ func (s *SenderTest) Default(cant bool) {
s.CantPullQuery = cant
s.CantPushQuery = cant
s.CantChits = cant
s.CantGossip = cant
}
// GetAcceptedFrontier calls GetAcceptedFrontierF if it was initialized. If it
@ -140,3 +143,14 @@ func (s *SenderTest) Chits(vdr ids.ShortID, requestID uint32, votes ids.Set) {
s.T.Fatalf("Unexpectedly called Chits")
}
}
// Gossip calls GossipF if it was initialized. If it wasn't initialized and this
// function shouldn't be called and testing was initialized, then testing will
// fail.
func (s *SenderTest) Gossip(containerID ids.ID, container []byte) {
if s.GossipF != nil {
s.GossipF(containerID, container)
} else if s.CantGossip && s.T != nil {
s.T.Fatalf("Unexpectedly called Gossip")
}
}

View File

@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
@ -54,7 +55,7 @@ func newConfig(t *testing.T) (BootstrapConfig, ids.ShortID, *common.SenderTest,
handler.Initialize(engine, make(chan common.Message), 1)
timeouts.Initialize(0)
router.Initialize(ctx.Log, timeouts)
router.Initialize(ctx.Log, timeouts, time.Hour)
blocker, _ := queue.New(db)

View File

@ -90,6 +90,19 @@ func (t *Transitive) finishBootstrapping() {
}
}
// Gossip implements the Engine interface
func (t *Transitive) Gossip() {
blkID := t.Config.VM.LastAccepted()
blk, err := t.Config.VM.GetBlock(blkID)
if err != nil {
t.Config.Context.Log.Warn("Dropping gossip request as %s couldn't be loaded due to %s", blkID, err)
return
}
t.Config.Context.Log.Debug("Gossiping %s as accepted to the network", blkID)
t.Config.Sender.Gossip(blkID, blk.Bytes())
}
// Shutdown implements the Engine interface
func (t *Transitive) Shutdown() {
t.Config.Context.Log.Info("Shutting down Snowman consensus")

View File

@ -1203,6 +1203,39 @@ func TestEngineUndeclaredDependencyDeadlock(t *testing.T) {
}
}
func TestEngineGossip(t *testing.T) {
_, _, sender, vm, te, gBlk := setup(t)
vm.LastAcceptedF = func() ids.ID { return gBlk.ID() }
vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) {
switch {
case blkID.Equals(gBlk.ID()):
return gBlk, nil
}
t.Fatal(errUnknownBlock)
return nil, errUnknownBlock
}
called := new(bool)
sender.GossipF = func(blkID ids.ID, blkBytes []byte) {
*called = true
switch {
case !blkID.Equals(gBlk.ID()):
t.Fatal(errUnknownBlock)
}
switch {
case !bytes.Equal(blkBytes, gBlk.Bytes()):
t.Fatal(errUnknownBytes)
}
}
te.Gossip()
if !*called {
t.Fatalf("Should have gossiped the block")
}
}
func TestEngineInvalidBlockIgnoredFromUnexpectedPeer(t *testing.T) {
vdr, vdrs, sender, vm, te, gBlk := setup(t)

View File

@ -91,6 +91,8 @@ func (h *Handler) dispatchMsg(msg message) bool {
h.engine.Chits(msg.validatorID, msg.requestID, msg.containerIDs)
case notifyMsg:
h.engine.Notify(msg.notification)
case gossipMsg:
h.engine.Gossip()
case shutdownMsg:
h.engine.Shutdown()
return false
@ -231,6 +233,9 @@ func (h *Handler) QueryFailed(validatorID ids.ShortID, requestID uint32) {
}
}
// Gossip passes a gossip request to the consensus engine
func (h *Handler) Gossip() { h.msgs <- message{messageType: gossipMsg} }
// Shutdown shuts down the dispatcher
func (h *Handler) Shutdown() { h.msgs <- message{messageType: shutdownMsg}; h.wg.Wait() }

View File

@ -29,6 +29,7 @@ const (
chitsMsg
queryFailedMsg
notifyMsg
gossipMsg
shutdownMsg
)
@ -87,6 +88,8 @@ func (t msgType) String() string {
return "Query Failed Message"
case notifyMsg:
return "Notify Message"
case gossipMsg:
return "Gossip Message"
case shutdownMsg:
return "Shutdown Message"
default:

View File

@ -4,6 +4,8 @@
package router
import (
"time"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow/networking/handler"
"github.com/ava-labs/gecko/snow/networking/timeout"
@ -19,7 +21,7 @@ type Router interface {
AddChain(chain *handler.Handler)
RemoveChain(chainID ids.ID)
Shutdown()
Initialize(log logging.Logger, timeouts *timeout.Manager)
Initialize(log logging.Logger, timeouts *timeout.Manager, gossipFrequency time.Duration)
}
// ExternalRouter routes messages from the network to the

View File

@ -5,11 +5,13 @@ package router
import (
"sync"
"time"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow/networking/handler"
"github.com/ava-labs/gecko/snow/networking/timeout"
"github.com/ava-labs/gecko/utils/logging"
"github.com/ava-labs/gecko/utils/timer"
)
// ChainRouter routes incoming messages from the validator network
@ -21,15 +23,24 @@ type ChainRouter struct {
lock sync.RWMutex
chains map[[32]byte]*handler.Handler
timeouts *timeout.Manager
gossiper *timer.Repeater
}
// Initialize the router
// When this router receives an incoming message, it cancels the timeout in [timeouts]
// associated with the request that caused the incoming message, if applicable
func (sr *ChainRouter) Initialize(log logging.Logger, timeouts *timeout.Manager) {
// Initialize the router.
//
// When this router receives an incoming message, it cancels the timeout in
// [timeouts] associated with the request that caused the incoming message, if
// applicable.
//
// This router also fires a gossip event every [gossipFrequency] to the engine,
// notifying the engine it should gossip it's accepted set.
func (sr *ChainRouter) Initialize(log logging.Logger, timeouts *timeout.Manager, gossipFrequency time.Duration) {
sr.log = log
sr.chains = make(map[[32]byte]*handler.Handler)
sr.timeouts = timeouts
sr.gossiper = timer.NewRepeater(sr.Gossip, gossipFrequency)
go log.RecoverAndPanic(sr.gossiper.Dispatch)
}
// AddChain registers the specified chain so that incoming
@ -255,4 +266,19 @@ func (sr *ChainRouter) shutdown() {
for _, chain := range sr.chains {
chain.Shutdown()
}
sr.gossiper.Stop()
}
// Gossip accepted containers
func (sr *ChainRouter) Gossip() {
sr.lock.RLock()
defer sr.lock.RUnlock()
sr.gossip()
}
func (sr *ChainRouter) gossip() {
for _, chain := range sr.chains {
chain.Gossip()
}
}

View File

@ -20,4 +20,6 @@ type ExternalSender interface {
PushQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte)
PullQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID)
Chits(validatorID ids.ShortID, chainID ids.ID, requestID uint32, votes ids.Set)
Gossip(chainID ids.ID, containerID ids.ID, container []byte)
}

View File

@ -163,3 +163,9 @@ func (s *Sender) Chits(validatorID ids.ShortID, requestID uint32, votes ids.Set)
}
s.sender.Chits(validatorID, s.ctx.ChainID, requestID, votes)
}
// Gossip the provided container
func (s *Sender) Gossip(containerID ids.ID, container []byte) {
s.ctx.Log.Verbo("Gossiping %s", containerID)
s.sender.Gossip(s.ctx.ChainID, containerID, container)
}

View File

@ -38,7 +38,7 @@ func TestTimeout(t *testing.T) {
go tm.Dispatch()
router := router.ChainRouter{}
router.Initialize(logging.NoLog{}, &tm)
router.Initialize(logging.NoLog{}, &tm, time.Hour)
sender := Sender{}
sender.Initialize(snow.DefaultContextTest(), &ExternalSenderTest{}, &router, &tm)

View File

@ -17,7 +17,8 @@ type ExternalSenderTest struct {
CantGetAcceptedFrontier, CantAcceptedFrontier,
CantGetAccepted, CantAccepted,
CantGet, CantPut,
CantPullQuery, CantPushQuery, CantChits bool
CantPullQuery, CantPushQuery, CantChits,
CantGossip bool
GetAcceptedFrontierF func(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32)
AcceptedFrontierF func(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set)
@ -28,6 +29,7 @@ type ExternalSenderTest struct {
PushQueryF func(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte)
PullQueryF func(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID)
ChitsF func(validatorID ids.ShortID, chainID ids.ID, requestID uint32, votes ids.Set)
GossipF func(chainID ids.ID, containerID ids.ID, container []byte)
}
// Default set the default callable value to [cant]
@ -41,6 +43,7 @@ func (s *ExternalSenderTest) Default(cant bool) {
s.CantPullQuery = cant
s.CantPushQuery = cant
s.CantChits = cant
s.CantGossip = cant
}
// GetAcceptedFrontier calls GetAcceptedFrontierF if it was initialized. If it
@ -159,3 +162,16 @@ func (s *ExternalSenderTest) Chits(vdr ids.ShortID, chainID ids.ID, requestID ui
s.B.Fatalf("Unexpectedly called Chits")
}
}
// Gossip calls GossipF if it was initialized. If it wasn't initialized and this
// function shouldn't be called and testing was initialized, then testing will
// fail.
func (s *ExternalSenderTest) Gossip(chainID ids.ID, containerID ids.ID, container []byte) {
if s.GossipF != nil {
s.GossipF(chainID, containerID, container)
} else if s.CantGossip && s.T != nil {
s.T.Fatalf("Unexpectedly called Gossip")
} else if s.CantGossip && s.B != nil {
s.B.Fatalf("Unexpectedly called Gossip")
}
}

16
utils/bytes.go Normal file
View File

@ -0,0 +1,16 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package utils
// CopyBytes returns a copy of the provided byte slice. If nil is provided, nil
// will be returned.
func CopyBytes(b []byte) []byte {
if b == nil {
return nil
}
cb := make([]byte, len(b))
copy(cb, b)
return cb
}

24
utils/bytes_test.go Normal file
View File

@ -0,0 +1,24 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package utils
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestCopyBytesNil(t *testing.T) {
result := CopyBytes(nil)
assert.Nil(t, result, "CopyBytes(nil) should have returned nil")
}
func TestCopyBytes(t *testing.T) {
input := []byte{1}
result := CopyBytes(input)
assert.Equal(t, input, result, "CopyBytes should have returned equal bytes")
input[0] = 0
assert.NotEqual(t, input, result, "CopyBytes should have returned independent bytes")
}

View File

@ -7,6 +7,7 @@ import (
"bytes"
"errors"
"fmt"
"math"
"net/http"
"github.com/ava-labs/gecko/ids"
@ -15,7 +16,7 @@ import (
"github.com/ava-labs/gecko/utils/formatting"
"github.com/ava-labs/gecko/utils/hashing"
"github.com/ava-labs/gecko/utils/json"
"github.com/ava-labs/gecko/utils/math"
safemath "github.com/ava-labs/gecko/utils/math"
"github.com/ava-labs/gecko/vms/components/ava"
"github.com/ava-labs/gecko/vms/components/verify"
"github.com/ava-labs/gecko/vms/secp256k1fx"
@ -221,7 +222,7 @@ func (service *Service) GetBalance(r *http.Request, args *GetBalanceArgs, reply
if !ok {
continue
}
amt, err := math.Add64(transferable.Amount(), uint64(reply.Balance))
amt, err := safemath.Add64(transferable.Amount(), uint64(reply.Balance))
if err != nil {
return err
}
@ -231,6 +232,78 @@ func (service *Service) GetBalance(r *http.Request, args *GetBalanceArgs, reply
return nil
}
// Balance ...
type Balance struct {
AssetID string `json:"asset"`
Balance json.Uint64 `json:"balance"`
}
// GetAllBalancesArgs are arguments for calling into GetAllBalances
type GetAllBalancesArgs struct {
Address string `json:"address"`
}
// GetAllBalancesReply is the response from a call to GetAllBalances
type GetAllBalancesReply struct {
Balances []Balance `json:"balances"`
}
// GetAllBalances returns a map where:
// Key: ID of an asset such that [args.Address] has a non-zero balance of the asset
// Value: The balance of the asset held by the address
// Note that balances include assets that the address only _partially_ owns
// (ie is one of several addresses specified in a multi-sig)
func (service *Service) GetAllBalances(r *http.Request, args *GetAllBalancesArgs, reply *GetAllBalancesReply) error {
service.vm.ctx.Log.Verbo("GetAllBalances called with address: %s", args.Address)
address, err := service.vm.Parse(args.Address)
if err != nil {
return fmt.Errorf("couldn't parse given address: %s", err)
}
addrAsSet := ids.Set{}
addrAsSet.Add(ids.NewID(hashing.ComputeHash256Array(address)))
utxos, err := service.vm.GetUTXOs(addrAsSet)
if err != nil {
return fmt.Errorf("couldn't get address's UTXOs: %s", err)
}
assetIDs := ids.Set{} // IDs of assets the address has a non-zero balance of
balances := make(map[[32]byte]uint64, 0) // key: ID (as bytes). value: balance of that asset
for _, utxo := range utxos {
transferable, ok := utxo.Out.(ava.Transferable)
if !ok {
continue
}
assetID := utxo.AssetID()
assetIDs.Add(assetID)
balance := balances[assetID.Key()] // 0 if key doesn't exist
balance, err := safemath.Add64(transferable.Amount(), balance)
if err != nil {
balances[assetID.Key()] = math.MaxUint64
} else {
balances[assetID.Key()] = balance
}
}
reply.Balances = make([]Balance, assetIDs.Len())
for i, assetID := range assetIDs.List() {
if alias, err := service.vm.PrimaryAlias(assetID); err == nil {
reply.Balances[i] = Balance{
AssetID: alias,
Balance: json.Uint64(balances[assetID.Key()]),
}
} else {
reply.Balances[i] = Balance{
AssetID: assetID.String(),
Balance: json.Uint64(balances[assetID.Key()]),
}
}
}
return nil
}
// CreateFixedCapAssetArgs are arguments for passing into CreateFixedCapAsset requests
type CreateFixedCapAssetArgs struct {
Username string `json:"username"`
@ -613,7 +686,7 @@ func (service *Service) Send(r *http.Request, args *SendArgs, reply *SendReply)
if !ok {
continue
}
spent, err := math.Add64(amountSpent, input.Amount())
spent, err := safemath.Add64(amountSpent, input.Amount())
if err != nil {
return errSpendOverflow
}
@ -1020,7 +1093,7 @@ func (service *Service) ImportAVA(_ *http.Request, args *ImportAVAArgs, reply *I
if !ok {
continue
}
spent, err := math.Add64(amount, input.Amount())
spent, err := safemath.Add64(amount, input.Amount())
if err != nil {
return errSpendOverflow
}
@ -1164,7 +1237,7 @@ func (service *Service) ExportAVA(_ *http.Request, args *ExportAVAArgs, reply *E
if !ok {
continue
}
spent, err := math.Add64(amountSpent, input.Amount())
spent, err := safemath.Add64(amountSpent, input.Amount())
if err != nil {
return errSpendOverflow
}

View File

@ -90,7 +90,9 @@ func (s *State) Status(id ids.ID) (choices.Status, error) {
}
var status choices.Status
s.Codec.Unmarshal(bytes, &status)
if err := s.Codec.Unmarshal(bytes, &status); err != nil {
return choices.Unknown, err
}
s.Cache.Put(id, status)
return status, nil
@ -103,12 +105,12 @@ func (s *State) SetStatus(id ids.ID, status choices.Status) error {
return s.DB.Delete(id.Bytes())
}
s.Cache.Put(id, status)
bytes, err := s.Codec.Marshal(status)
if err != nil {
return err
}
s.Cache.Put(id, status)
return s.DB.Put(id.Bytes(), bytes)
}
@ -142,12 +144,11 @@ func (s *State) SetIDs(id ids.ID, idSlice []ids.ID) error {
return s.DB.Delete(id.Bytes())
}
s.Cache.Put(id, idSlice)
bytes, err := s.Codec.Marshal(idSlice)
if err != nil {
return err
}
s.Cache.Put(id, idSlice)
return s.DB.Put(id.Bytes(), bytes)
}

View File

@ -1510,7 +1510,7 @@ func TestBootstrapPartiallyAccepted(t *testing.T) {
go timeoutManager.Dispatch()
router := &router.ChainRouter{}
router.Initialize(logging.NoLog{}, &timeoutManager)
router.Initialize(logging.NoLog{}, &timeoutManager, time.Hour)
externalSender := &sender.ExternalSenderTest{T: t}
externalSender.Default(true)

View File

@ -62,7 +62,7 @@ func ConsensusLeader(numBlocks, numTxsPerBlock int, b *testing.B) {
go timeoutManager.Dispatch()
router := &router.ChainRouter{}
router.Initialize(logging.NoLog{}, &timeoutManager)
router.Initialize(logging.NoLog{}, &timeoutManager, time.Hour)
// Initialize the VM
vm := &VM{}
@ -189,7 +189,7 @@ func ConsensusFollower(numBlocks, numTxsPerBlock int, b *testing.B) {
go timeoutManager.Dispatch()
router := &router.ChainRouter{}
router.Initialize(logging.NoLog{}, &timeoutManager)
router.Initialize(logging.NoLog{}, &timeoutManager, time.Hour)
wg := sync.WaitGroup{}
wg.Add(numBlocks)

View File

@ -130,8 +130,6 @@ func (s *state) SetStatus(id ids.ID, status choices.Status) error {
return s.vm.db.Delete(id.Bytes())
}
s.c.Put(id, status)
p := wrappers.Packer{Bytes: make([]byte, 4)}
p.PackInt(uint32(status))
@ -143,6 +141,8 @@ func (s *state) SetStatus(id ids.ID, status choices.Status) error {
if p.Errored() {
return p.Err
}
s.c.Put(id, status)
return s.vm.db.Put(id.Bytes(), p.Bytes)
}
@ -186,8 +186,6 @@ func (s *state) SetIDs(id ids.ID, idSlice []ids.ID) error {
return s.vm.db.Delete(id.Bytes())
}
s.c.Put(id, idSlice)
size := wrappers.IntLen + hashing.HashLen*len(idSlice)
p := wrappers.Packer{Bytes: make([]byte, size)}
@ -203,5 +201,7 @@ func (s *state) SetIDs(id ids.ID, idSlice []ids.ID) error {
if p.Errored() {
return p.Err
}
s.c.Put(id, idSlice)
return s.vm.db.Put(id.Bytes(), p.Bytes)
}