Merge branch 'master' into polls-early-termination

This commit is contained in:
Stephen Buttolph 2020-06-16 23:29:35 -04:00 committed by GitHub
commit 19d871538e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 759 additions and 190 deletions

8
cache/lru_cache.go vendored
View File

@ -10,6 +10,10 @@ import (
"github.com/ava-labs/gecko/ids"
)
const (
minCacheSize = 32
)
type entry struct {
Key ids.ID
Value interface{}
@ -59,7 +63,7 @@ func (c *LRU) Flush() {
func (c *LRU) init() {
if c.entryMap == nil {
c.entryMap = make(map[[32]byte]*list.Element)
c.entryMap = make(map[[32]byte]*list.Element, minCacheSize)
}
if c.entryList == nil {
c.entryList = list.New()
@ -134,6 +138,6 @@ func (c *LRU) evict(key ids.ID) {
func (c *LRU) flush() {
c.init()
c.entryMap = make(map[[32]byte]*list.Element)
c.entryMap = make(map[[32]byte]*list.Element, minCacheSize)
c.entryList = list.New()
}

53
cache/lru_cache_benchmark_test.go vendored Normal file
View File

@ -0,0 +1,53 @@
package cache
import (
"crypto/rand"
"testing"
"github.com/ava-labs/gecko/ids"
)
func BenchmarkLRUCachePutSmall(b *testing.B) {
smallLen := 5
cache := &LRU{Size: smallLen}
for n := 0; n < b.N; n++ {
for i := 0; i < smallLen; i++ {
var idBytes [32]byte
rand.Read(idBytes[:])
cache.Put(ids.NewID(idBytes), n)
}
b.StopTimer()
cache.Flush()
b.StartTimer()
}
}
func BenchmarkLRUCachePutMedium(b *testing.B) {
mediumLen := 250
cache := &LRU{Size: mediumLen}
for n := 0; n < b.N; n++ {
for i := 0; i < mediumLen; i++ {
var idBytes [32]byte
rand.Read(idBytes[:])
cache.Put(ids.NewID(idBytes), n)
}
b.StopTimer()
cache.Flush()
b.StartTimer()
}
}
func BenchmarkLRUCachePutLarge(b *testing.B) {
largeLen := 10000
cache := &LRU{Size: largeLen}
for n := 0; n < b.N; n++ {
for i := 0; i < largeLen; i++ {
var idBytes [32]byte
rand.Read(idBytes[:])
cache.Put(ids.NewID(idBytes), n)
}
b.StopTimer()
cache.Flush()
b.StartTimer()
}
}

View File

@ -50,6 +50,122 @@ func (c *Config) init() error {
// Hard coded genesis constants
var (
EverestConfig = Config{
MintAddresses: []string{
"95YUFjhDG892VePMzpwKF9JzewGKvGRi3",
},
FundedAddresses: []string{
"9uKvvA7E35QCwLvAaohXTCfFejbf3Rv17",
"JLrYNMYXANGj43BfWXBxMMAEenUBp1Sbn",
"7TUTzwrU6nbZtWHjTHEpdneUvjKBxb3EM",
"77mPUXBdQKwQpPoX6rckCZGLGGdkuG1G6",
"4gGWdFZ4Gax1B466YKXyKRRpWLb42Afdt",
"CKTkzAPsRxCreyiDTnjGxLmjMarxF28fi",
"4ABm9gFHVtsNdcKSd1xsacFkGneSgzpaa",
"DpL8PTsrjtLzv5J8LL3D2A6YcnCTqrNH9",
"ZdhZv6oZrmXLyFDy6ovXAu6VxmbTsT2h",
"6cesTteH62Y5mLoDBUASaBvCXuL2AthL",
},
StakerIDs: []string{
"LQwRLm4cbJ7T2kxcxp4uXCU5XD8DFrE1C",
"hArafGhY2HFTbwaaVh1CSCUCUCiJ2Vfb",
"2m38qc95mhHXtrhjyGbe7r2NhniqHHJRB",
"4QBwET5o8kUhvt9xArhir4d3R25CtmZho",
"NpagUxt6KQiwPch9Sd4osv8kD1TZnkjdk",
},
EVMBytes: []byte{
0x7b, 0x22, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67,
0x22, 0x3a, 0x7b, 0x22, 0x63, 0x68, 0x61, 0x69,
0x6e, 0x49, 0x64, 0x22, 0x3a, 0x34, 0x33, 0x31,
0x31, 0x30, 0x2c, 0x22, 0x68, 0x6f, 0x6d, 0x65,
0x73, 0x74, 0x65, 0x61, 0x64, 0x42, 0x6c, 0x6f,
0x63, 0x6b, 0x22, 0x3a, 0x30, 0x2c, 0x22, 0x64,
0x61, 0x6f, 0x46, 0x6f, 0x72, 0x6b, 0x42, 0x6c,
0x6f, 0x63, 0x6b, 0x22, 0x3a, 0x30, 0x2c, 0x22,
0x64, 0x61, 0x6f, 0x46, 0x6f, 0x72, 0x6b, 0x53,
0x75, 0x70, 0x70, 0x6f, 0x72, 0x74, 0x22, 0x3a,
0x74, 0x72, 0x75, 0x65, 0x2c, 0x22, 0x65, 0x69,
0x70, 0x31, 0x35, 0x30, 0x42, 0x6c, 0x6f, 0x63,
0x6b, 0x22, 0x3a, 0x30, 0x2c, 0x22, 0x65, 0x69,
0x70, 0x31, 0x35, 0x30, 0x48, 0x61, 0x73, 0x68,
0x22, 0x3a, 0x22, 0x30, 0x78, 0x32, 0x30, 0x38,
0x36, 0x37, 0x39, 0x39, 0x61, 0x65, 0x65, 0x62,
0x65, 0x61, 0x65, 0x31, 0x33, 0x35, 0x63, 0x32,
0x34, 0x36, 0x63, 0x36, 0x35, 0x30, 0x32, 0x31,
0x63, 0x38, 0x32, 0x62, 0x34, 0x65, 0x31, 0x35,
0x61, 0x32, 0x63, 0x34, 0x35, 0x31, 0x33, 0x34,
0x30, 0x39, 0x39, 0x33, 0x61, 0x61, 0x63, 0x66,
0x64, 0x32, 0x37, 0x35, 0x31, 0x38, 0x38, 0x36,
0x35, 0x31, 0x34, 0x66, 0x30, 0x22, 0x2c, 0x22,
0x65, 0x69, 0x70, 0x31, 0x35, 0x35, 0x42, 0x6c,
0x6f, 0x63, 0x6b, 0x22, 0x3a, 0x30, 0x2c, 0x22,
0x65, 0x69, 0x70, 0x31, 0x35, 0x38, 0x42, 0x6c,
0x6f, 0x63, 0x6b, 0x22, 0x3a, 0x30, 0x2c, 0x22,
0x62, 0x79, 0x7a, 0x61, 0x6e, 0x74, 0x69, 0x75,
0x6d, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x22, 0x3a,
0x30, 0x2c, 0x22, 0x63, 0x6f, 0x6e, 0x73, 0x74,
0x61, 0x6e, 0x74, 0x69, 0x6e, 0x6f, 0x70, 0x6c,
0x65, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x22, 0x3a,
0x30, 0x2c, 0x22, 0x70, 0x65, 0x74, 0x65, 0x72,
0x73, 0x62, 0x75, 0x72, 0x67, 0x42, 0x6c, 0x6f,
0x63, 0x6b, 0x22, 0x3a, 0x30, 0x7d, 0x2c, 0x22,
0x6e, 0x6f, 0x6e, 0x63, 0x65, 0x22, 0x3a, 0x22,
0x30, 0x78, 0x30, 0x22, 0x2c, 0x22, 0x74, 0x69,
0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x22,
0x3a, 0x22, 0x30, 0x78, 0x30, 0x22, 0x2c, 0x22,
0x65, 0x78, 0x74, 0x72, 0x61, 0x44, 0x61, 0x74,
0x61, 0x22, 0x3a, 0x22, 0x30, 0x78, 0x30, 0x30,
0x22, 0x2c, 0x22, 0x67, 0x61, 0x73, 0x4c, 0x69,
0x6d, 0x69, 0x74, 0x22, 0x3a, 0x22, 0x30, 0x78,
0x35, 0x66, 0x35, 0x65, 0x31, 0x30, 0x30, 0x22,
0x2c, 0x22, 0x64, 0x69, 0x66, 0x66, 0x69, 0x63,
0x75, 0x6c, 0x74, 0x79, 0x22, 0x3a, 0x22, 0x30,
0x78, 0x30, 0x22, 0x2c, 0x22, 0x6d, 0x69, 0x78,
0x48, 0x61, 0x73, 0x68, 0x22, 0x3a, 0x22, 0x30,
0x78, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30,
0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30,
0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30,
0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30,
0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30,
0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30,
0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30,
0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30,
0x30, 0x22, 0x2c, 0x22, 0x63, 0x6f, 0x69, 0x6e,
0x62, 0x61, 0x73, 0x65, 0x22, 0x3a, 0x22, 0x30,
0x78, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30,
0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30,
0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30,
0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30,
0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30,
0x30, 0x22, 0x2c, 0x22, 0x61, 0x6c, 0x6c, 0x6f,
0x63, 0x22, 0x3a, 0x7b, 0x22, 0x35, 0x37, 0x32,
0x66, 0x34, 0x64, 0x38, 0x30, 0x66, 0x31, 0x30,
0x66, 0x36, 0x36, 0x33, 0x62, 0x35, 0x30, 0x34,
0x39, 0x66, 0x37, 0x38, 0x39, 0x35, 0x34, 0x36,
0x66, 0x32, 0x35, 0x66, 0x37, 0x30, 0x62, 0x62,
0x36, 0x32, 0x61, 0x37, 0x66, 0x22, 0x3a, 0x7b,
0x22, 0x62, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x65,
0x22, 0x3a, 0x22, 0x30, 0x78, 0x33, 0x33, 0x62,
0x32, 0x65, 0x33, 0x63, 0x39, 0x66, 0x64, 0x30,
0x38, 0x30, 0x34, 0x30, 0x30, 0x30, 0x30, 0x30,
0x30, 0x30, 0x30, 0x30, 0x22, 0x7d, 0x7d, 0x2c,
0x22, 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x22,
0x3a, 0x22, 0x30, 0x78, 0x30, 0x22, 0x2c, 0x22,
0x67, 0x61, 0x73, 0x55, 0x73, 0x65, 0x64, 0x22,
0x3a, 0x22, 0x30, 0x78, 0x30, 0x22, 0x2c, 0x22,
0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x48, 0x61,
0x73, 0x68, 0x22, 0x3a, 0x22, 0x30, 0x78, 0x30,
0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30,
0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30,
0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30,
0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30,
0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30,
0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30,
0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30,
0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x22,
0x7d,
},
}
DenaliConfig = Config{
MintAddresses: []string{
"95YUFjhDG892VePMzpwKF9JzewGKvGRi3",
@ -393,6 +509,8 @@ var (
// GetConfig ...
func GetConfig(networkID uint32) *Config {
switch networkID {
case EverestID:
return &EverestConfig
case DenaliID:
return &DenaliConfig
case CascadeID:

View File

@ -23,8 +23,11 @@ func TestNetworkName(t *testing.T) {
if name := NetworkName(DenaliID); name != DenaliName {
t.Fatalf("NetworkID was incorrectly named. Result: %s ; Expected: %s", name, DenaliName)
}
if name := NetworkName(TestnetID); name != DenaliName {
t.Fatalf("NetworkID was incorrectly named. Result: %s ; Expected: %s", name, DenaliName)
if name := NetworkName(EverestID); name != EverestName {
t.Fatalf("NetworkID was incorrectly named. Result: %s ; Expected: %s", name, EverestName)
}
if name := NetworkName(TestnetID); name != EverestName {
t.Fatalf("NetworkID was incorrectly named. Result: %s ; Expected: %s", name, EverestName)
}
if name := NetworkName(4294967295); name != "network-4294967295" {
t.Fatalf("NetworkID was incorrectly named. Result: %s ; Expected: %s", name, "network-4294967295")

View File

@ -16,13 +16,15 @@ var (
MainnetID uint32 = 1
CascadeID uint32 = 2
DenaliID uint32 = 3
EverestID uint32 = 4
TestnetID uint32 = 3
TestnetID uint32 = 4
LocalID uint32 = 12345
MainnetName = "mainnet"
CascadeName = "cascade"
DenaliName = "denali"
EverestName = "everest"
TestnetName = "testnet"
LocalName = "local"
@ -31,6 +33,7 @@ var (
MainnetID: MainnetName,
CascadeID: CascadeName,
DenaliID: DenaliName,
EverestID: EverestName,
LocalID: LocalName,
}
@ -38,6 +41,7 @@ var (
MainnetName: MainnetID,
CascadeName: CascadeID,
DenaliName: DenaliID,
EverestName: EverestID,
TestnetName: TestnetID,
LocalName: LocalID,

1
go.mod
View File

@ -33,6 +33,7 @@ require (
github.com/olekukonko/tablewriter v0.0.4 // indirect
github.com/pborman/uuid v1.2.0 // indirect
github.com/prometheus/client_golang v1.6.0
github.com/prometheus/common v0.9.1
github.com/prometheus/tsdb v0.10.0 // indirect
github.com/rjeczalik/notify v0.9.2 // indirect
github.com/rs/cors v1.7.0

4
go.sum
View File

@ -8,8 +8,10 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE
github.com/Shopify/sarama v1.26.1/go.mod h1:NbSGBSSndYaIhRcBtY9V0U7AyH+x71bG668AuWys/yU=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1CELW+XaDYmOH4hkBN4/N9og/AsOv7E=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/allegro/bigcache v1.2.1 h1:hg1sY1raCwic3Vnsvje6TT7/pnZba83LeFck5NrFKSc=
github.com/allegro/bigcache v1.2.1/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
@ -218,6 +220,7 @@ github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik=
github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/status-im/keycard-go v0.0.0-20200402102358-957c09536969 h1:Oo2KZNP70KE0+IUJSidPj/BFS/RXNHmKIJOdckzml2E=
@ -337,6 +340,7 @@ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miE
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/bsm/ratelimit.v1 v1.0.0-20160220154919-db14e161995a/go.mod h1:KF9sEfUPAXdG8Oev9e99iLGnl2uJMjc5B+4y3O7x610=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@ -8,6 +8,10 @@ import (
"strings"
)
const (
minBagSize = 16
)
// Bag is a multiset of IDs.
//
// A bag has the ability to split and filter on it's bits for ease of use for
@ -25,7 +29,7 @@ type Bag struct {
func (b *Bag) init() {
if b.counts == nil {
b.counts = make(map[[32]byte]int)
b.counts = make(map[[32]byte]int, minBagSize)
}
}
@ -72,16 +76,21 @@ func (b *Bag) AddCount(id ID, count int) {
}
// Count returns the number of times the id has been added.
func (b *Bag) Count(id ID) int { return b.counts[*id.ID] }
func (b *Bag) Count(id ID) int {
b.init()
return b.counts[*id.ID]
}
// Len returns the number of times an id has been added.
func (b *Bag) Len() int { return b.size }
// List returns a list of all ids that have been added.
func (b *Bag) List() []ID {
idList := []ID(nil)
idList := make([]ID, len(b.counts), len(b.counts))
i := 0
for id := range b.counts {
idList = append(idList, NewID(id))
idList[i] = NewID(id)
i++
}
return idList
}

53
ids/bag_benchmark_test.go Normal file
View File

@ -0,0 +1,53 @@
package ids
import (
"crypto/rand"
"testing"
)
//
func BenchmarkBagListSmall(b *testing.B) {
smallLen := 5
bag := Bag{}
for i := 0; i < smallLen; i++ {
var idBytes [32]byte
rand.Read(idBytes[:])
NewID(idBytes)
bag.Add(NewID(idBytes))
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
bag.List()
}
}
func BenchmarkBagListMedium(b *testing.B) {
mediumLen := 25
bag := Bag{}
for i := 0; i < mediumLen; i++ {
var idBytes [32]byte
rand.Read(idBytes[:])
NewID(idBytes)
bag.Add(NewID(idBytes))
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
bag.List()
}
}
func BenchmarkBagListLarge(b *testing.B) {
largeLen := 100000
bag := Bag{}
for i := 0; i < largeLen; i++ {
var idBytes [32]byte
rand.Read(idBytes[:])
NewID(idBytes)
bag.Add(NewID(idBytes))
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
bag.List()
}
}

View File

@ -18,8 +18,8 @@ func TestBagAdd(t *testing.T) {
} else if count := bag.Count(id1); count != 0 {
t.Fatalf("Bag.Count returned %d expected %d", count, 0)
} else if size := bag.Len(); size != 0 {
t.Fatalf("Bag.Len returned %d expected %d", count, 0)
} else if list := bag.List(); list != nil {
t.Fatalf("Bag.Len returned %d elements expected %d", count, 0)
} else if list := bag.List(); len(list) != 0 {
t.Fatalf("Bag.List returned %v expected %v", list, nil)
} else if mode, freq := bag.Mode(); !mode.IsZero() {
t.Fatalf("Bag.Mode[0] returned %s expected %s", mode, ID{})

View File

@ -7,11 +7,19 @@ import (
"strings"
)
const (
// The minimum capacity of a set
minSetSize = 16
)
// Set is a set of IDs
type Set map[[32]byte]bool
func (ids *Set) init(size int) {
if *ids == nil {
if minSetSize > size {
size = minSetSize
}
*ids = make(map[[32]byte]bool, size)
}
}
@ -70,9 +78,11 @@ func (ids *Set) Clear() { *ids = nil }
// List converts this set into a list
func (ids Set) List() []ID {
idList := []ID(nil)
idList := make([]ID, ids.Len(), ids.Len())
i := 0
for id := range ids {
idList = append(idList, NewID(id))
idList[i] = NewID(id)
i++
}
return idList
}

53
ids/set_benchmark_test.go Normal file
View File

@ -0,0 +1,53 @@
package ids
import (
"crypto/rand"
"testing"
)
//
func BenchmarkSetListSmall(b *testing.B) {
smallLen := 5
set := Set{}
for i := 0; i < smallLen; i++ {
var idBytes [32]byte
rand.Read(idBytes[:])
NewID(idBytes)
set.Add(NewID(idBytes))
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
set.List()
}
}
func BenchmarkSetListMedium(b *testing.B) {
mediumLen := 25
set := Set{}
for i := 0; i < mediumLen; i++ {
var idBytes [32]byte
rand.Read(idBytes[:])
NewID(idBytes)
set.Add(NewID(idBytes))
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
set.List()
}
}
func BenchmarkSetListLarge(b *testing.B) {
largeLen := 100000
set := Set{}
for i := 0; i < largeLen; i++ {
var idBytes [32]byte
rand.Read(idBytes[:])
NewID(idBytes)
set.Add(NewID(idBytes))
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
set.List()
}
}

View File

@ -5,11 +5,18 @@ package ids
import "strings"
const (
minShortSetSize = 16
)
// ShortSet is a set of ShortIDs
type ShortSet map[[20]byte]bool
func (ids *ShortSet) init(size int) {
if *ids == nil {
if minShortSetSize > size {
size = minShortSetSize
}
*ids = make(map[[20]byte]bool, size)
}
}
@ -65,9 +72,11 @@ func (ids ShortSet) CappedList(size int) []ShortID {
// List converts this set into a list
func (ids ShortSet) List() []ShortID {
idList := make([]ShortID, len(ids))[:0]
idList := make([]ShortID, len(ids), len(ids))
i := 0
for id := range ids {
idList = append(idList, NewShortID(id))
idList[i] = NewShortID(id)
i++
}
return idList
}

View File

@ -8,12 +8,16 @@ import (
"strings"
)
const (
minUniqueBagSize = 16
)
// UniqueBag ...
type UniqueBag map[[32]byte]BitSet
func (b *UniqueBag) init() {
if *b == nil {
*b = make(map[[32]byte]BitSet)
*b = make(map[[32]byte]BitSet, minUniqueBagSize)
}
}

View File

@ -30,7 +30,7 @@ import (
)
const (
dbVersion = "v0.5.0"
dbVersion = "v0.6.0"
)
// Results of parsing the CLI

View File

@ -33,6 +33,12 @@ func (m Builder) PeerList(ipDescs []utils.IPDesc) (Msg, error) {
return m.Pack(PeerList, map[Field]interface{}{Peers: ipDescs})
}
// Ping message
func (m Builder) Ping() (Msg, error) { return m.Pack(Ping, nil) }
// Pong message
func (m Builder) Pong() (Msg, error) { return m.Pack(Pong, nil) }
// GetAcceptedFrontier message
func (m Builder) GetAcceptedFrontier(chainID ids.ID, requestID uint32) (Msg, error) {
return m.Pack(GetAcceptedFrontier, map[Field]interface{}{

View File

@ -132,6 +132,10 @@ func (op Op) String() string {
return "get_peerlist"
case PeerList:
return "peerlist"
case Ping:
return "ping"
case Pong:
return "pong"
case GetAcceptedFrontier:
return "get_accepted_frontier"
case AcceptedFrontier:
@ -166,22 +170,21 @@ const (
Version
GetPeerList
PeerList
Ping
Pong
// Bootstrapping:
GetAcceptedFrontier
AcceptedFrontier
GetAccepted
Accepted
GetAncestors
MultiPut
// Consensus:
Get
Put
PushQuery
PullQuery
Chits
// Bootstrapping:
// TODO: Move GetAncestors and MultiPut with the rest of the bootstrapping
// commands when we do non-backwards compatible upgrade
GetAncestors
MultiPut
)
// Defines the messages that can be sent/received with this network
@ -192,6 +195,8 @@ var (
Version: {NetworkID, NodeID, MyTime, IP, VersionStr},
GetPeerList: {},
PeerList: {Peers},
Ping: {},
Pong: {},
// Bootstrapping:
GetAcceptedFrontier: {ChainID, RequestID},
AcceptedFrontier: {ChainID, RequestID, ContainerIDs},

View File

@ -54,6 +54,7 @@ type metrics struct {
getVersion, version,
getPeerlist, peerlist,
ping, pong,
getAcceptedFrontier, acceptedFrontier,
getAccepted, accepted,
get, getAncestors, put, multiPut,
@ -78,6 +79,8 @@ func (m *metrics) initialize(registerer prometheus.Registerer) error {
errs.Add(m.version.initialize(Version, registerer))
errs.Add(m.getPeerlist.initialize(GetPeerList, registerer))
errs.Add(m.peerlist.initialize(PeerList, registerer))
errs.Add(m.ping.initialize(Ping, registerer))
errs.Add(m.pong.initialize(Pong, registerer))
errs.Add(m.getAcceptedFrontier.initialize(GetAcceptedFrontier, registerer))
errs.Add(m.acceptedFrontier.initialize(AcceptedFrontier, registerer))
errs.Add(m.getAccepted.initialize(GetAccepted, registerer))
@ -103,6 +106,10 @@ func (m *metrics) message(msgType Op) *messageMetrics {
return &m.getPeerlist
case PeerList:
return &m.peerlist
case Ping:
return &m.ping
case Pong:
return &m.pong
case GetAcceptedFrontier:
return &m.getAcceptedFrontier
case AcceptedFrontier:

View File

@ -43,6 +43,12 @@ const (
defaultGetVersionTimeout = 2 * time.Second
defaultAllowPrivateIPs = true
defaultGossipSize = 50
defaultPingPongTimeout = time.Minute
defaultPingFrequency = 3 * defaultPingPongTimeout / 4
// Request ID used when sending a Put message to gossip an accepted container
// (ie not sent in response to a Get)
GossipMsgRequestID = math.MaxUint32
)
// Network defines the functionality of the networking library.
@ -119,6 +125,8 @@ type network struct {
getVersionTimeout time.Duration
allowPrivateIPs bool
gossipSize int
pingPongTimeout time.Duration
pingFrequency time.Duration
executor timer.Executor
@ -180,6 +188,8 @@ func NewDefaultNetwork(
defaultGetVersionTimeout,
defaultAllowPrivateIPs,
defaultGossipSize,
defaultPingPongTimeout,
defaultPingFrequency,
)
}
@ -211,6 +221,8 @@ func NewNetwork(
getVersionTimeout time.Duration,
allowPrivateIPs bool,
gossipSize int,
pingPongTimeout time.Duration,
pingFrequency time.Duration,
) Network {
net := &network{
log: log,
@ -239,6 +251,8 @@ func NewNetwork(
getVersionTimeout: getVersionTimeout,
allowPrivateIPs: allowPrivateIPs,
gossipSize: gossipSize,
pingPongTimeout: pingPongTimeout,
pingFrequency: pingFrequency,
disconnectedIPs: make(map[string]struct{}),
connectedIPs: make(map[string]struct{}),
@ -705,7 +719,7 @@ func (n *network) Track(ip utils.IPDesc) {
// assumes the stateLock is not held.
func (n *network) gossipContainer(chainID, containerID ids.ID, container []byte) error {
msg, err := n.b.Put(chainID, math.MaxUint32, containerID, container)
msg, err := n.b.Put(chainID, GossipMsgRequestID, containerID, container)
if err != nil {
return fmt.Errorf("attempted to pack too large of a Put message.\nContainer length: %d", len(container))
}

View File

@ -64,6 +64,24 @@ func (p *peer) Start() {
// Initially send the version to the peer
go p.Version()
go p.requestVersion()
go p.sendPings()
}
func (p *peer) sendPings() {
t := time.NewTicker(p.net.pingFrequency)
defer t.Stop()
for range t.C {
p.net.stateLock.Lock()
closed := p.closed
p.net.stateLock.Unlock()
if closed {
return
}
p.Ping()
}
}
// request the version from the peer until we get the version from them
@ -80,6 +98,7 @@ func (p *peer) requestVersion() {
if connected || closed {
return
}
p.GetVersion()
}
}
@ -88,6 +107,11 @@ func (p *peer) requestVersion() {
func (p *peer) ReadMessages() {
defer p.Close()
if err := p.conn.SetReadDeadline(p.net.clock.Time().Add(p.net.pingPongTimeout)); err != nil {
p.net.log.Verbo("error on setting the connection read timeout %s", err)
return
}
pendingBuffer := wrappers.Packer{}
readBuffer := make([]byte, 1<<10)
for {
@ -218,7 +242,15 @@ func (p *peer) send(msg Msg) bool {
// assumes the stateLock is not held
func (p *peer) handle(msg Msg) {
p.net.heartbeat()
atomic.StoreInt64(&p.lastReceived, p.net.clock.Time().Unix())
currentTime := p.net.clock.Time()
atomic.StoreInt64(&p.lastReceived, currentTime.Unix())
if err := p.conn.SetReadDeadline(currentTime.Add(p.net.pingPongTimeout)); err != nil {
p.net.log.Verbo("error on setting the connection read timeout %s, closing the connection", err)
p.Close()
return
}
op := msg.Op()
msgMetrics := p.net.message(op)
@ -235,6 +267,12 @@ func (p *peer) handle(msg Msg) {
case GetVersion:
p.getVersion(msg)
return
case Ping:
p.ping(msg)
return
case Pong:
p.pong(msg)
return
}
if !p.connected {
p.net.log.Debug("dropping message from %s because the connection hasn't been established yet", p.id)
@ -318,6 +356,12 @@ func (p *peer) GetPeerList() {
p.Send(msg)
}
// assumes the stateLock is not held
func (p *peer) SendPeerList() {
ips := p.net.validatorIPs()
p.PeerList(ips)
}
// assumes the stateLock is not held
func (p *peer) PeerList(peers []utils.IPDesc) {
msg, err := p.net.b.PeerList(peers)
@ -326,7 +370,28 @@ func (p *peer) PeerList(peers []utils.IPDesc) {
return
}
p.Send(msg)
return
}
// assumes the stateLock is not held
func (p *peer) Ping() {
msg, err := p.net.b.Ping()
p.net.log.AssertNoError(err)
if p.Send(msg) {
p.net.ping.numSent.Inc()
} else {
p.net.ping.numFailed.Inc()
}
}
// assumes the stateLock is not held
func (p *peer) Pong() {
msg, err := p.net.b.Pong()
p.net.log.AssertNoError(err)
if p.Send(msg) {
p.net.pong.numSent.Inc()
} else {
p.net.pong.numFailed.Inc()
}
}
// assumes the stateLock is not held
@ -458,17 +523,6 @@ func (p *peer) version(msg Msg) {
p.net.connected(p)
}
// assumes the stateLock is not held
func (p *peer) SendPeerList() {
ips := p.net.validatorIPs()
reply, err := p.net.b.PeerList(ips)
if err != nil {
p.net.log.Warn("failed to send PeerList message due to %s", err)
return
}
p.Send(reply)
}
// assumes the stateLock is not held
func (p *peer) getPeerList(_ Msg) { p.SendPeerList() }
@ -488,6 +542,12 @@ func (p *peer) peerList(msg Msg) {
p.net.stateLock.Unlock()
}
// assumes the stateLock is not held
func (p *peer) ping(_ Msg) { p.Pong() }
// assumes the stateLock is not held
func (p *peer) pong(_ Msg) {}
// assumes the stateLock is not held
func (p *peer) getAcceptedFrontier(msg Msg) {
chainID, err := ids.ToID(msg.Get(ChainID).([]byte))

View File

@ -56,7 +56,7 @@ var (
genesisHashKey = []byte("genesisID")
// Version is the version of this code
Version = version.NewDefaultVersion("avalanche", 0, 5, 5)
Version = version.NewDefaultVersion("avalanche", 0, 6, 0)
versionParser = version.NewDefaultParser()
)

View File

@ -167,7 +167,11 @@ func (t *Transitive) Put(vdr ids.ShortID, requestID uint32, vtxID ids.ID, vtxByt
t.Config.Context.Log.Verbo("Put(%s, %d, %s) called", vdr, requestID, vtxID)
if !t.bootstrapped { // Bootstrapping unfinished --> didn't call Get --> this message is invalid
t.Config.Context.Log.Debug("dropping Put(%s, %d, %s) due to bootstrapping", vdr, requestID, vtxID)
if requestID == network.GossipMsgRequestID {
t.Config.Context.Log.Verbo("dropping gossip Put(%s, %d, %s) due to bootstrapping", vdr, requestID, vtxID)
} else {
t.Config.Context.Log.Debug("dropping Put(%s, %d, %s) due to bootstrapping", vdr, requestID, vtxID)
}
return nil
}
@ -333,10 +337,10 @@ func (t *Transitive) reinsertFrom(vdr ids.ShortID, vtxID ids.ID) (bool, error) {
func (t *Transitive) insertFrom(vdr ids.ShortID, vtx avalanche.Vertex) (bool, error) {
issued := true
vts := []avalanche.Vertex{vtx}
for len(vts) > 0 {
vtx := vts[0]
vts = vts[1:]
vertexHeap := newMaxVertexHeap()
vertexHeap.Push(vtx)
for vertexHeap.Len() > 0 {
vtx := vertexHeap.Pop()
if t.Consensus.VertexIssued(vtx) {
continue
@ -351,7 +355,7 @@ func (t *Transitive) insertFrom(vdr ids.ShortID, vtx avalanche.Vertex) (bool, er
t.sendRequest(vdr, parent.ID())
issued = false
} else {
vts = append(vts, parent)
vertexHeap.Push(parent)
}
}

View File

@ -7,6 +7,10 @@ import (
"github.com/ava-labs/gecko/ids"
)
const (
minRequestsSize = 32
)
type req struct {
vdr ids.ShortID
id uint32
@ -22,7 +26,7 @@ type Requests struct {
// are only in one request at a time.
func (r *Requests) Add(vdr ids.ShortID, requestID uint32, containerID ids.ID) {
if r.reqsToID == nil {
r.reqsToID = make(map[[20]byte]map[uint32]ids.ID)
r.reqsToID = make(map[[20]byte]map[uint32]ids.ID, minRequestsSize)
}
vdrKey := vdr.Key()
vdrReqs, ok := r.reqsToID[vdrKey]
@ -33,7 +37,7 @@ func (r *Requests) Add(vdr ids.ShortID, requestID uint32, containerID ids.ID) {
vdrReqs[requestID] = containerID
if r.idToReq == nil {
r.idToReq = make(map[[32]byte]req)
r.idToReq = make(map[[32]byte]req, minRequestsSize)
}
r.idToReq[containerID.Key()] = req{
vdr: vdr,

View File

@ -185,7 +185,11 @@ func (t *Transitive) GetAncestors(vdr ids.ShortID, requestID uint32, blkID ids.I
func (t *Transitive) Put(vdr ids.ShortID, requestID uint32, blkID ids.ID, blkBytes []byte) error {
// bootstrapping isn't done --> we didn't send any gets --> this put is invalid
if !t.bootstrapped {
t.Config.Context.Log.Debug("dropping Put(%s, %d, %s) due to bootstrapping", vdr, requestID, blkID)
if requestID == network.GossipMsgRequestID {
t.Config.Context.Log.Verbo("dropping gossip Put(%s, %d, %s) due to bootstrapping", vdr, requestID, blkID)
} else {
t.Config.Context.Log.Debug("dropping Put(%s, %d, %s) due to bootstrapping", vdr, requestID, blkID)
}
return nil
}

View File

@ -10,12 +10,16 @@ import (
"github.com/ava-labs/gecko/ids"
)
const (
minBlockerSize = 16
)
// Blocker tracks objects that are blocked
type Blocker map[[32]byte][]Blockable
func (b *Blocker) init() {
if *b == nil {
*b = make(map[[32]byte][]Blockable)
*b = make(map[[32]byte][]Blockable, minBlockerSize)
}
}

View File

@ -124,19 +124,12 @@ func (sr *ChainRouter) GetAcceptedFrontierFailed(validatorID ids.ShortID, chainI
sr.lock.RLock()
defer sr.lock.RUnlock()
sr.timeouts.Cancel(validatorID, chainID, requestID)
if chain, exists := sr.chains[chainID.Key()]; exists {
if !chain.GetAcceptedFrontierFailed(validatorID, requestID) {
sr.log.Debug("deferring GetAcceptedFrontier timeout due to a full queue on %s", chainID)
// Defer this call to later
sr.timeouts.Register(validatorID, chainID, requestID, func() {
sr.GetAcceptedFrontierFailed(validatorID, chainID, requestID)
})
return
}
chain.GetAcceptedFrontierFailed(validatorID, requestID)
} else {
sr.log.Error("GetAcceptedFrontierFailed(%s, %s, %d) dropped due to unknown chain", validatorID, chainID, requestID)
}
sr.timeouts.Cancel(validatorID, chainID, requestID)
}
// GetAccepted routes an incoming GetAccepted request from the
@ -176,18 +169,12 @@ func (sr *ChainRouter) GetAcceptedFailed(validatorID ids.ShortID, chainID ids.ID
sr.lock.RLock()
defer sr.lock.RUnlock()
sr.timeouts.Cancel(validatorID, chainID, requestID)
if chain, exists := sr.chains[chainID.Key()]; exists {
if !chain.GetAcceptedFailed(validatorID, requestID) {
sr.timeouts.Register(validatorID, chainID, requestID, func() {
sr.log.Debug("deferring GetAccepted timeout due to a full queue on %s", chainID)
sr.GetAcceptedFailed(validatorID, chainID, requestID)
})
return
}
chain.GetAcceptedFailed(validatorID, requestID)
} else {
sr.log.Error("GetAcceptedFailed(%s, %s, %d) dropped due to unknown chain", validatorID, chainID, requestID)
}
sr.timeouts.Cancel(validatorID, chainID, requestID)
}
// GetAncestors routes an incoming GetAncestors message from the validator with ID [validatorID]
@ -227,18 +214,12 @@ func (sr *ChainRouter) GetAncestorsFailed(validatorID ids.ShortID, chainID ids.I
sr.lock.RLock()
defer sr.lock.RUnlock()
sr.timeouts.Cancel(validatorID, chainID, requestID)
if chain, exists := sr.chains[chainID.Key()]; exists {
if !chain.GetAncestorsFailed(validatorID, requestID) {
sr.timeouts.Register(validatorID, chainID, requestID, func() {
sr.log.Debug("deferring GetAncestors timeout due to a full queue on %s", chainID)
sr.GetAncestorsFailed(validatorID, chainID, requestID)
})
return
}
chain.GetAncestorsFailed(validatorID, requestID)
} else {
sr.log.Error("GetAncestorsFailed(%s, %s, %d, %d) dropped due to unknown chain", validatorID, chainID, requestID)
}
sr.timeouts.Cancel(validatorID, chainID, requestID)
}
// Get routes an incoming Get request from the validator with ID [validatorID]
@ -278,18 +259,12 @@ func (sr *ChainRouter) GetFailed(validatorID ids.ShortID, chainID ids.ID, reques
sr.lock.RLock()
defer sr.lock.RUnlock()
sr.timeouts.Cancel(validatorID, chainID, requestID)
if chain, exists := sr.chains[chainID.Key()]; exists {
if !chain.GetFailed(validatorID, requestID) {
sr.timeouts.Register(validatorID, chainID, requestID, func() {
sr.log.Debug("deferring Get timeout due to a full queue on %s", chainID)
sr.GetFailed(validatorID, chainID, requestID)
})
return
}
chain.GetFailed(validatorID, requestID)
} else {
sr.log.Error("GetFailed(%s, %s, %d) dropped due to unknown chain", validatorID, chainID, requestID)
}
sr.timeouts.Cancel(validatorID, chainID, requestID)
}
// PushQuery routes an incoming PushQuery request from the validator with ID [validatorID]
@ -341,18 +316,12 @@ func (sr *ChainRouter) QueryFailed(validatorID ids.ShortID, chainID ids.ID, requ
sr.lock.RLock()
defer sr.lock.RUnlock()
sr.timeouts.Cancel(validatorID, chainID, requestID)
if chain, exists := sr.chains[chainID.Key()]; exists {
if !chain.QueryFailed(validatorID, requestID) {
sr.timeouts.Register(validatorID, chainID, requestID, func() {
sr.log.Debug("deferring Query timeout due to a full queue on %s", chainID)
sr.QueryFailed(validatorID, chainID, requestID)
})
return
}
chain.QueryFailed(validatorID, requestID)
} else {
sr.log.Error("QueryFailed(%s, %s, %d, %s) dropped due to unknown chain", validatorID, chainID, requestID)
}
sr.timeouts.Cancel(validatorID, chainID, requestID)
}
// Shutdown shuts down this router

View File

@ -4,12 +4,14 @@
package router
import (
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow"
"github.com/ava-labs/gecko/snow/engine/common"
"github.com/prometheus/client_golang/prometheus"
)
// Handler passes incoming messages from the network to the consensus engine
@ -17,12 +19,18 @@ import (
type Handler struct {
metrics
msgs chan message
closed chan struct{}
engine common.Engine
msgChan <-chan common.Message
msgs chan message
reliableMsgsSema chan struct{}
reliableMsgsLock sync.Mutex
reliableMsgs []message
closed chan struct{}
msgChan <-chan common.Message
ctx *snow.Context
engine common.Engine
toClose func()
closing bool
}
// Initialize this consensus handler
@ -35,9 +43,12 @@ func (h *Handler) Initialize(
) {
h.metrics.Initialize(namespace, metrics)
h.msgs = make(chan message, bufferSize)
h.reliableMsgsSema = make(chan struct{}, 1)
h.closed = make(chan struct{})
h.engine = engine
h.msgChan = msgChan
h.ctx = engine.Context()
h.engine = engine
}
// Context of this Handler
@ -46,37 +57,38 @@ func (h *Handler) Context() *snow.Context { return h.engine.Context() }
// Dispatch waits for incoming messages from the network
// and, when they arrive, sends them to the consensus engine
func (h *Handler) Dispatch() {
log := h.Context().Log
defer func() {
log.Info("finished shutting down chain")
h.ctx.Log.Info("finished shutting down chain")
close(h.closed)
}()
closing := false
for {
select {
case msg, ok := <-h.msgs:
if !ok {
// the msgs channel has been closed, so this dispatcher should exit
return
}
h.metrics.pending.Dec()
if closing {
log.Debug("dropping message due to closing:\n%s", msg)
continue
}
if h.dispatchMsg(msg) {
closing = true
h.dispatchMsg(msg)
case <-h.reliableMsgsSema:
// get all the reliable messages
h.reliableMsgsLock.Lock()
msgs := h.reliableMsgs
h.reliableMsgs = nil
h.reliableMsgsLock.Unlock()
// fire all the reliable messages
for _, msg := range msgs {
h.metrics.pending.Dec()
h.dispatchMsg(msg)
}
case msg := <-h.msgChan:
if closing {
log.Debug("dropping internal message due to closing:\n%s", msg)
continue
}
if h.dispatchMsg(message{messageType: notifyMsg, notification: msg}) {
closing = true
}
// handle a message from the VM
h.dispatchMsg(message{messageType: notifyMsg, notification: msg})
}
if closing && h.toClose != nil {
if h.closing && h.toClose != nil {
go h.toClose()
}
}
@ -85,14 +97,19 @@ func (h *Handler) Dispatch() {
// Dispatch a message to the consensus engine.
// Returns true iff this consensus handler (and its associated engine) should shutdown
// (due to receipt of a shutdown message)
func (h *Handler) dispatchMsg(msg message) bool {
func (h *Handler) dispatchMsg(msg message) {
if h.closing {
h.ctx.Log.Debug("dropping message due to closing:\n%s", msg)
h.metrics.dropped.Inc()
return
}
startTime := time.Now()
ctx := h.engine.Context()
ctx.Lock.Lock()
defer ctx.Lock.Unlock()
h.ctx.Lock.Lock()
defer h.ctx.Lock.Unlock()
ctx.Log.Verbo("Forwarding message to consensus: %s", msg)
h.ctx.Log.Verbo("Forwarding message to consensus: %s", msg)
var (
err error
done bool
@ -159,9 +176,10 @@ func (h *Handler) dispatchMsg(msg message) bool {
}
if err != nil {
ctx.Log.Fatal("forcing chain to shutdown due to %s", err)
h.ctx.Log.Fatal("forcing chain to shutdown due to %s", err)
}
return done || err != nil
h.closing = done || err != nil
}
// GetAcceptedFrontier passes a GetAcceptedFrontier message received from the
@ -187,8 +205,8 @@ func (h *Handler) AcceptedFrontier(validatorID ids.ShortID, requestID uint32, co
// GetAcceptedFrontierFailed passes a GetAcceptedFrontierFailed message received
// from the network to the consensus engine.
func (h *Handler) GetAcceptedFrontierFailed(validatorID ids.ShortID, requestID uint32) bool {
return h.sendMsg(message{
func (h *Handler) GetAcceptedFrontierFailed(validatorID ids.ShortID, requestID uint32) {
h.sendReliableMsg(message{
messageType: getAcceptedFrontierFailedMsg,
validatorID: validatorID,
requestID: requestID,
@ -219,14 +237,43 @@ func (h *Handler) Accepted(validatorID ids.ShortID, requestID uint32, containerI
// GetAcceptedFailed passes a GetAcceptedFailed message received from the
// network to the consensus engine.
func (h *Handler) GetAcceptedFailed(validatorID ids.ShortID, requestID uint32) bool {
return h.sendMsg(message{
func (h *Handler) GetAcceptedFailed(validatorID ids.ShortID, requestID uint32) {
h.sendReliableMsg(message{
messageType: getAcceptedFailedMsg,
validatorID: validatorID,
requestID: requestID,
})
}
// GetAncestors passes a GetAncestors message received from the network to the consensus engine.
func (h *Handler) GetAncestors(validatorID ids.ShortID, requestID uint32, containerID ids.ID) bool {
return h.sendMsg(message{
messageType: getAncestorsMsg,
validatorID: validatorID,
requestID: requestID,
containerID: containerID,
})
}
// MultiPut passes a MultiPut message received from the network to the consensus engine.
func (h *Handler) MultiPut(validatorID ids.ShortID, requestID uint32, containers [][]byte) bool {
return h.sendMsg(message{
messageType: multiPutMsg,
validatorID: validatorID,
requestID: requestID,
containers: containers,
})
}
// GetAncestorsFailed passes a GetAncestorsFailed message to the consensus engine.
func (h *Handler) GetAncestorsFailed(validatorID ids.ShortID, requestID uint32) {
h.sendReliableMsg(message{
messageType: getAncestorsFailedMsg,
validatorID: validatorID,
requestID: requestID,
})
}
// Get passes a Get message received from the network to the consensus engine.
func (h *Handler) Get(validatorID ids.ShortID, requestID uint32, containerID ids.ID) bool {
return h.sendMsg(message{
@ -237,16 +284,6 @@ func (h *Handler) Get(validatorID ids.ShortID, requestID uint32, containerID ids
})
}
// GetAncestors passes a GetAncestors message received from the network to the consensus engine.
func (h *Handler) GetAncestors(validatorID ids.ShortID, requestID uint32, containerID ids.ID) bool {
return h.sendMsg(message{
messageType: getAncestorsMsg,
validatorID: validatorID,
requestID: requestID,
containerID: containerID,
})
}
// Put passes a Put message received from the network to the consensus engine.
func (h *Handler) Put(validatorID ids.ShortID, requestID uint32, containerID ids.ID, container []byte) bool {
return h.sendMsg(message{
@ -258,34 +295,15 @@ func (h *Handler) Put(validatorID ids.ShortID, requestID uint32, containerID ids
})
}
// MultiPut passes a MultiPut message received from the network to the consensus engine.
func (h *Handler) MultiPut(validatorID ids.ShortID, requestID uint32, containers [][]byte) bool {
return h.sendMsg(message{
messageType: multiPutMsg,
validatorID: validatorID,
requestID: requestID,
containers: containers,
})
}
// GetFailed passes a GetFailed message to the consensus engine.
func (h *Handler) GetFailed(validatorID ids.ShortID, requestID uint32) bool {
return h.sendMsg(message{
func (h *Handler) GetFailed(validatorID ids.ShortID, requestID uint32) {
h.sendReliableMsg(message{
messageType: getFailedMsg,
validatorID: validatorID,
requestID: requestID,
})
}
// GetAncestorsFailed passes a GetAncestorsFailed message to the consensus engine.
func (h *Handler) GetAncestorsFailed(validatorID ids.ShortID, requestID uint32) bool {
return h.sendMsg(message{
messageType: getAncestorsFailedMsg,
validatorID: validatorID,
requestID: requestID,
})
}
// PushQuery passes a PushQuery message received from the network to the consensus engine.
func (h *Handler) PushQuery(validatorID ids.ShortID, requestID uint32, blockID ids.ID, block []byte) bool {
return h.sendMsg(message{
@ -318,8 +336,8 @@ func (h *Handler) Chits(validatorID ids.ShortID, requestID uint32, votes ids.Set
}
// QueryFailed passes a QueryFailed message received from the network to the consensus engine.
func (h *Handler) QueryFailed(validatorID ids.ShortID, requestID uint32) bool {
return h.sendMsg(message{
func (h *Handler) QueryFailed(validatorID ids.ShortID, requestID uint32) {
h.sendReliableMsg(message{
messageType: queryFailedMsg,
validatorID: validatorID,
requestID: requestID,
@ -341,8 +359,9 @@ func (h *Handler) Notify(msg common.Message) bool {
// Shutdown shuts down the dispatcher
func (h *Handler) Shutdown() {
h.metrics.pending.Inc()
h.msgs <- message{messageType: shutdownMsg}
h.sendReliableMsg(message{
messageType: shutdownMsg,
})
}
func (h *Handler) sendMsg(msg message) bool {
@ -355,3 +374,15 @@ func (h *Handler) sendMsg(msg message) bool {
return false
}
}
func (h *Handler) sendReliableMsg(msg message) {
h.reliableMsgsLock.Lock()
defer h.reliableMsgsLock.Unlock()
h.metrics.pending.Inc()
h.reliableMsgs = append(h.reliableMsgs, msg)
select {
case h.reliableMsgsSema <- struct{}{}:
default:
}
}

View File

@ -31,17 +31,16 @@ func (s *Sender) Context() *snow.Context { return s.ctx }
// GetAcceptedFrontier ...
func (s *Sender) GetAcceptedFrontier(validatorIDs ids.ShortSet, requestID uint32) {
if validatorIDs.Contains(s.ctx.NodeID) {
validatorIDs.Remove(s.ctx.NodeID)
go s.router.GetAcceptedFrontier(s.ctx.NodeID, s.ctx.ChainID, requestID)
}
validatorList := validatorIDs.List()
for _, validatorID := range validatorList {
for _, validatorID := range validatorIDs.List() {
vID := validatorID
s.timeouts.Register(validatorID, s.ctx.ChainID, requestID, func() {
s.router.GetAcceptedFrontierFailed(vID, s.ctx.ChainID, requestID)
})
}
if validatorIDs.Contains(s.ctx.NodeID) {
validatorIDs.Remove(s.ctx.NodeID)
go s.router.GetAcceptedFrontier(s.ctx.NodeID, s.ctx.ChainID, requestID)
}
s.sender.GetAcceptedFrontier(validatorIDs, s.ctx.ChainID, requestID)
}
@ -49,24 +48,23 @@ func (s *Sender) GetAcceptedFrontier(validatorIDs ids.ShortSet, requestID uint32
func (s *Sender) AcceptedFrontier(validatorID ids.ShortID, requestID uint32, containerIDs ids.Set) {
if validatorID.Equals(s.ctx.NodeID) {
go s.router.AcceptedFrontier(validatorID, s.ctx.ChainID, requestID, containerIDs)
return
} else {
s.sender.AcceptedFrontier(validatorID, s.ctx.ChainID, requestID, containerIDs)
}
s.sender.AcceptedFrontier(validatorID, s.ctx.ChainID, requestID, containerIDs)
}
// GetAccepted ...
func (s *Sender) GetAccepted(validatorIDs ids.ShortSet, requestID uint32, containerIDs ids.Set) {
if validatorIDs.Contains(s.ctx.NodeID) {
validatorIDs.Remove(s.ctx.NodeID)
go s.router.GetAccepted(s.ctx.NodeID, s.ctx.ChainID, requestID, containerIDs)
}
validatorList := validatorIDs.List()
for _, validatorID := range validatorList {
for _, validatorID := range validatorIDs.List() {
vID := validatorID
s.timeouts.Register(validatorID, s.ctx.ChainID, requestID, func() {
s.router.GetAcceptedFailed(vID, s.ctx.ChainID, requestID)
})
}
if validatorIDs.Contains(s.ctx.NodeID) {
validatorIDs.Remove(s.ctx.NodeID)
go s.router.GetAccepted(s.ctx.NodeID, s.ctx.ChainID, requestID, containerIDs)
}
s.sender.GetAccepted(validatorIDs, s.ctx.ChainID, requestID, containerIDs)
}
@ -74,9 +72,9 @@ func (s *Sender) GetAccepted(validatorIDs ids.ShortSet, requestID uint32, contai
func (s *Sender) Accepted(validatorID ids.ShortID, requestID uint32, containerIDs ids.Set) {
if validatorID.Equals(s.ctx.NodeID) {
go s.router.Accepted(validatorID, s.ctx.ChainID, requestID, containerIDs)
return
} else {
s.sender.Accepted(validatorID, s.ctx.ChainID, requestID, containerIDs)
}
s.sender.Accepted(validatorID, s.ctx.ChainID, requestID, containerIDs)
}
// Get sends a Get message to the consensus engine running on the specified
@ -85,6 +83,13 @@ func (s *Sender) Accepted(validatorID ids.ShortID, requestID uint32, containerID
// specified container.
func (s *Sender) Get(validatorID ids.ShortID, requestID uint32, containerID ids.ID) {
s.ctx.Log.Verbo("Sending Get to validator %s. RequestID: %d. ContainerID: %s", validatorID, requestID, containerID)
// Sending a Get to myself will always fail
if validatorID.Equals(s.ctx.NodeID) {
go s.router.GetFailed(validatorID, s.ctx.ChainID, requestID)
return
}
// Add a timeout -- if we don't get a response before the timeout expires,
// send this consensus engine a GetFailed message
s.timeouts.Register(validatorID, s.ctx.ChainID, requestID, func() {
@ -101,6 +106,7 @@ func (s *Sender) GetAncestors(validatorID ids.ShortID, requestID uint32, contain
go s.router.GetAncestorsFailed(validatorID, s.ctx.ChainID, requestID)
return
}
s.timeouts.Register(validatorID, s.ctx.ChainID, requestID, func() {
s.router.GetAncestorsFailed(validatorID, s.ctx.ChainID, requestID)
})
@ -130,6 +136,13 @@ func (s *Sender) MultiPut(validatorID ids.ShortID, requestID uint32, containers
// their preferred frontier given the existence of the specified container.
func (s *Sender) PushQuery(validatorIDs ids.ShortSet, requestID uint32, containerID ids.ID, container []byte) {
s.ctx.Log.Verbo("Sending PushQuery to validators %v. RequestID: %d. ContainerID: %s", validatorIDs, requestID, containerID)
for _, validatorID := range validatorIDs.List() {
vID := validatorID
s.timeouts.Register(validatorID, s.ctx.ChainID, requestID, func() {
s.router.QueryFailed(vID, s.ctx.ChainID, requestID)
})
}
// If one of the validators in [validatorIDs] is myself, send this message directly
// to my own router rather than sending it over the network
if validatorIDs.Contains(s.ctx.NodeID) { // One of the validators in [validatorIDs] was myself
@ -139,13 +152,7 @@ func (s *Sender) PushQuery(validatorIDs ids.ShortSet, requestID uint32, containe
// If this were not a goroutine, then we would deadlock here when [handler].msgs is full
go s.router.PushQuery(s.ctx.NodeID, s.ctx.ChainID, requestID, containerID, container)
}
validatorList := validatorIDs.List() // Convert set to list for easier iteration
for _, validatorID := range validatorList {
vID := validatorID
s.timeouts.Register(validatorID, s.ctx.ChainID, requestID, func() {
s.router.QueryFailed(vID, s.ctx.ChainID, requestID)
})
}
s.sender.PushQuery(validatorIDs, s.ctx.ChainID, requestID, containerID, container)
}
@ -155,6 +162,14 @@ func (s *Sender) PushQuery(validatorIDs ids.ShortSet, requestID uint32, containe
// their preferred frontier.
func (s *Sender) PullQuery(validatorIDs ids.ShortSet, requestID uint32, containerID ids.ID) {
s.ctx.Log.Verbo("Sending PullQuery. RequestID: %d. ContainerID: %s", requestID, containerID)
for _, validatorID := range validatorIDs.List() {
vID := validatorID
s.timeouts.Register(validatorID, s.ctx.ChainID, requestID, func() {
s.router.QueryFailed(vID, s.ctx.ChainID, requestID)
})
}
// If one of the validators in [validatorIDs] is myself, send this message directly
// to my own router rather than sending it over the network
if validatorIDs.Contains(s.ctx.NodeID) { // One of the validators in [validatorIDs] was myself
@ -164,13 +179,7 @@ func (s *Sender) PullQuery(validatorIDs ids.ShortSet, requestID uint32, containe
// If this were not a goroutine, then we would deadlock when [handler].msgs is full
go s.router.PullQuery(s.ctx.NodeID, s.ctx.ChainID, requestID, containerID)
}
validatorList := validatorIDs.List() // Convert set to list for easier iteration
for _, validatorID := range validatorList {
vID := validatorID
s.timeouts.Register(validatorID, s.ctx.ChainID, requestID, func() {
s.router.QueryFailed(vID, s.ctx.ChainID, requestID)
})
}
s.sender.PullQuery(validatorIDs, s.ctx.ChainID, requestID, containerID)
}
@ -181,9 +190,9 @@ func (s *Sender) Chits(validatorID ids.ShortID, requestID uint32, votes ids.Set)
// to my own router rather than sending it over the network
if validatorID.Equals(s.ctx.NodeID) {
go s.router.Chits(validatorID, s.ctx.ChainID, requestID, votes)
return
} else {
s.sender.Chits(validatorID, s.ctx.ChainID, requestID, votes)
}
s.sender.Chits(validatorID, s.ctx.ChainID, requestID, votes)
}
// Gossip the provided container

View File

@ -4,18 +4,20 @@
package sender
import (
"math/rand"
"reflect"
"sync"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow"
"github.com/ava-labs/gecko/snow/engine/common"
"github.com/ava-labs/gecko/snow/networking/router"
"github.com/ava-labs/gecko/snow/networking/timeout"
"github.com/ava-labs/gecko/utils/logging"
"github.com/prometheus/client_golang/prometheus"
)
func TestSenderContext(t *testing.T) {
@ -82,3 +84,128 @@ func TestTimeout(t *testing.T) {
t.Fatalf("Timeouts should have fired")
}
}
func TestReliableMessages(t *testing.T) {
tm := timeout.Manager{}
tm.Initialize(50 * time.Millisecond)
go tm.Dispatch()
chainRouter := router.ChainRouter{}
chainRouter.Initialize(logging.NoLog{}, &tm, time.Hour, time.Second)
sender := Sender{}
sender.Initialize(snow.DefaultContextTest(), &ExternalSenderTest{}, &chainRouter, &tm)
engine := common.EngineTest{T: t}
engine.Default(true)
engine.ContextF = snow.DefaultContextTest
engine.GossipF = func() error { return nil }
queriesToSend := 1000
awaiting := make([]chan struct{}, queriesToSend)
for i := 0; i < queriesToSend; i++ {
awaiting[i] = make(chan struct{}, 1)
}
engine.QueryFailedF = func(validatorID ids.ShortID, reqID uint32) error {
close(awaiting[int(reqID)])
return nil
}
handler := router.Handler{}
handler.Initialize(
&engine,
nil,
1,
"",
prometheus.NewRegistry(),
)
go handler.Dispatch()
chainRouter.AddChain(&handler)
go func() {
for i := 0; i < queriesToSend; i++ {
vdrIDs := ids.ShortSet{}
vdrIDs.Add(ids.NewShortID([20]byte{1}))
sender.PullQuery(vdrIDs, uint32(i), ids.Empty)
time.Sleep(time.Duration(rand.Float64() * float64(time.Microsecond)))
}
}()
go func() {
for {
chainRouter.Gossip()
time.Sleep(time.Duration(rand.Float64() * float64(time.Microsecond)))
}
}()
for _, await := range awaiting {
_, _ = <-await
}
}
func TestReliableMessagesToMyself(t *testing.T) {
tm := timeout.Manager{}
tm.Initialize(50 * time.Millisecond)
go tm.Dispatch()
chainRouter := router.ChainRouter{}
chainRouter.Initialize(logging.NoLog{}, &tm, time.Hour, time.Second)
sender := Sender{}
sender.Initialize(snow.DefaultContextTest(), &ExternalSenderTest{}, &chainRouter, &tm)
engine := common.EngineTest{T: t}
engine.Default(false)
engine.ContextF = snow.DefaultContextTest
engine.GossipF = func() error { return nil }
engine.CantPullQuery = false
queriesToSend := 2
awaiting := make([]chan struct{}, queriesToSend)
for i := 0; i < queriesToSend; i++ {
awaiting[i] = make(chan struct{}, 1)
}
engine.QueryFailedF = func(validatorID ids.ShortID, reqID uint32) error {
close(awaiting[int(reqID)])
return nil
}
handler := router.Handler{}
handler.Initialize(
&engine,
nil,
1,
"",
prometheus.NewRegistry(),
)
go handler.Dispatch()
chainRouter.AddChain(&handler)
go func() {
for i := 0; i < queriesToSend; i++ {
vdrIDs := ids.ShortSet{}
vdrIDs.Add(engine.Context().NodeID)
sender.PullQuery(vdrIDs, uint32(i), ids.Empty)
time.Sleep(time.Duration(rand.Float64() * float64(time.Microsecond)))
}
}()
go func() {
for {
chainRouter.Gossip()
time.Sleep(time.Duration(rand.Float64() * float64(time.Microsecond)))
}
}()
for _, await := range awaiting {
_, _ = <-await
}
}