This commit is contained in:
Jae Kwon 2014-06-08 17:17:17 -07:00
parent c92fa8a08e
commit 4e0010a4e9
2 changed files with 130 additions and 14 deletions

View File

@ -8,6 +8,7 @@ import (
. "github.com/tendermint/tendermint/binary"
crand "crypto/rand" // for seeding
"encoding/binary"
"encoding/json"
"io"
"math"
"math/rand"
@ -15,10 +16,13 @@ import (
"sync"
"sync/atomic"
"time"
"os"
"fmt"
)
/* AddrManager - concurrency safe peer address manager */
type AddrManager struct {
mtx sync.Mutex
rand *rand.Rand
key [32]byte
addrIndex map[string]*KnownAddress // addr.String() -> KnownAddress
@ -27,10 +31,11 @@ type AddrManager struct {
started int32
shutdown int32
wg sync.WaitGroup
quit chan bool
quit chan struct{}
nOld int
nNew int
localAddresses map[string]*localAddress
filePath string
}
const (
@ -86,16 +91,18 @@ const (
)
// Use Start to begin processing asynchronous address updates.
func NewAddrManager() *AddrManager {
func NewAddrManager(filePath string) *AddrManager {
am := AddrManager{
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
quit: make(chan bool),
quit: make(chan struct{}),
localAddresses: make(map[string]*localAddress),
filePath: filePath,
}
am.init()
return &am
}
// When modifying this, don't forget to update loadFromFile()
func (a *AddrManager) init() {
a.addrIndex = make(map[string]*KnownAddress)
io.ReadFull(crand.Reader, a.key[:])
@ -110,8 +117,8 @@ func (a *AddrManager) init() {
func (a *AddrManager) Start() {
if atomic.AddInt32(&a.started, 1) != 1 { return }
amgrLog.Trace("Starting address manager")
a.loadFromFile(a.filePath)
a.wg.Add(1)
a.loadPeers()
go a.addressHandler()
}
@ -123,7 +130,7 @@ func (a *AddrManager) Stop() {
}
func (a *AddrManager) AddAddress(addr *NetAddress, src *NetAddress) {
// XXX use a channel for concurrency
a.mtx.Lock(); defer a.mtx.Unlock()
a.addAddress(addr, src)
}
@ -132,12 +139,15 @@ func (a *AddrManager) NeedMoreAddresses() bool {
}
func (a *AddrManager) NumAddresses() int {
a.mtx.Lock(); defer a.mtx.Unlock()
return a.nOld + a.nNew
}
// Pick a new address to connect to.
func (a *AddrManager) PickAddress(class string, newBias int) *KnownAddress {
if a.NumAddresses() == 0 { return nil }
a.mtx.Lock(); defer a.mtx.Unlock()
if a.nOld == 0 && a.nNew == 0 { return nil }
if newBias > 100 { newBias = 100 }
if newBias < 0 { newBias = 0 }
@ -146,28 +156,110 @@ func (a *AddrManager) PickAddress(class string, newBias int) *KnownAddress {
newCorrelation := math.Sqrt(float64(a.nNew)) * float64(newBias)
if (newCorrelation+oldCorrelation)*a.rand.Float64() < oldCorrelation {
// Old entry.
// XXX
// pick random Old bucket.
var bucket []*KnownAddress = nil
for len(bucket) == 0 {
bucket = a.addrOld[a.rand.Intn(len(a.addrOld))]
}
// pick a random ka from bucket.
return bucket[a.rand.Intn(len(bucket))]
} else {
// New entry.
// XXX
// pick random New bucket.
var bucket map[string]*KnownAddress = nil
for len(bucket) == 0 {
bucket = a.addrNew[a.rand.Intn(len(a.addrNew))]
}
// pick a random ka from bucket.
randIndex := a.rand.Intn(len(bucket))
for _, ka := range bucket {
randIndex--
if randIndex == 0 {
return ka
}
}
panic("Should not happen")
}
return nil
}
func (a *AddrManager) MarkGood(ka *KnownAddress) {
a.mtx.Lock(); defer a.mtx.Unlock()
ka.MarkAttempt(true)
a.moveToOld(ka)
}
func (a *AddrManager) MarkBad(ka *KnownAddress) {
a.mtx.Lock(); defer a.mtx.Unlock()
ka.MarkAttempt(false)
}
/* Loading & Saving */
func (a *AddrManager) loadPeers() {
type addrManagerJSON struct {
Key [32]byte
AddrNew [newBucketCount]map[string]*KnownAddress
AddrOld [oldBucketCount][]*KnownAddress
NOld int
NNew int
}
func (a *AddrManager) savePeers() {
func (a *AddrManager) saveToFile(filePath string) {
aJSON := &addrManagerJSON{
Key: a.key,
AddrNew: a.addrNew,
AddrOld: a.addrOld,
NOld: a.nOld,
NNew: a.nNew,
}
w, err := os.Create(filePath)
if err != nil {
amgrLog.Error("Error opening file: ", filePath, err)
return
}
enc := json.NewEncoder(w)
defer w.Close()
err = enc.Encode(&aJSON)
if err != nil { panic(err) }
}
func (a *AddrManager) loadFromFile(filePath string) {
// If doesn't exist, do nothing.
_, err := os.Stat(filePath)
if os.IsNotExist(err) { return }
r, err := os.Open(filePath)
if err != nil {
panic(fmt.Errorf("%s error opening file: %v", filePath, err))
}
defer r.Close()
aJSON := &addrManagerJSON{}
dec := json.NewDecoder(r)
err = dec.Decode(aJSON)
if err != nil {
panic(fmt.Errorf("error reading %s: %v", filePath, err))
}
// Now we need to initialize 'a'.
copy(a.key[:], aJSON.Key[:])
a.addrNew = aJSON.AddrNew
for i, oldBucket := range aJSON.AddrOld {
copy(a.addrOld[i], oldBucket)
}
a.nNew = aJSON.NNew
a.nOld = aJSON.NOld
a.addrIndex = make(map[string]*KnownAddress)
for _, newBucket := range a.addrNew {
for key, ka := range newBucket {
a.addrIndex[key] = ka
}
}
}
/* Private methods */
func (a *AddrManager) addressHandler() {
@ -176,13 +268,13 @@ out:
for {
select {
case <-dumpAddressTicker.C:
a.savePeers()
a.saveToFile(a.filePath)
case <-a.quit:
break out
}
}
dumpAddressTicker.Stop()
a.savePeers()
a.saveToFile(a.filePath)
a.wg.Done()
amgrLog.Trace("Address handler done")
}

View File

@ -3,6 +3,7 @@ package peer
import (
. "github.com/tendermint/tendermint/binary"
"time"
"io"
)
/*
@ -31,6 +32,29 @@ func NewKnownAddress(addr *NetAddress, src *NetAddress) *KnownAddress {
}
}
func ReadKnownAddress(r io.Reader) *KnownAddress {
return &KnownAddress{
Addr: ReadNetAddress(r),
Src: ReadNetAddress(r),
Attempts: ReadUInt32(r),
LastAttempt: ReadUInt64(r),
LastSuccess: ReadUInt64(r),
NewRefs: ReadUInt16(r),
OldBucket: ReadInt16(r),
}
}
func (ka *KnownAddress) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteOnto(ka.Addr, w, n, err)
n, err = WriteOnto(ka.Src, w, n, err)
n, err = WriteOnto(ka.Attempts, w, n, err)
n, err = WriteOnto(ka.LastAttempt, w, n, err)
n, err = WriteOnto(ka.LastSuccess, w, n, err)
n, err = WriteOnto(ka.NewRefs, w, n, err)
n, err = WriteOnto(ka.OldBucket, w, n, err)
return
}
func (ka *KnownAddress) MarkAttempt(success bool) {
now := UInt64(time.Now().Unix())
ka.LastAttempt = now