// Modified for Tendermint // Originally Copyright (c) 2013-2014 Conformal Systems LLC. // https://github.com/conformal/btcd/blob/master/LICENSE package peer import ( . "github.com/tendermint/tendermint/binary" crand "crypto/rand" // for seeding "encoding/binary" "encoding/json" "io" "math" "math/rand" "net" "sync" "sync/atomic" "time" "os" "fmt" ) /* AddrManager - concurrency safe peer address manager */ type AddrManager struct { mtx sync.Mutex rand *rand.Rand key [32]byte addrIndex map[string]*KnownAddress // addr.String() -> KnownAddress addrNew [newBucketCount]map[string]*KnownAddress addrOld [oldBucketCount][]*KnownAddress started int32 shutdown int32 wg sync.WaitGroup quit chan struct{} nOld int nNew int localAddresses map[string]*localAddress filePath string } const ( // addresses under which the address manager will claim to need more addresses. needAddressThreshold = 1000 // interval used to dump the address cache to disk for future use. dumpAddressInterval = time.Minute * 2 // max addresses in each old address bucket. oldBucketSize = 64 // buckets we split old addresses over. oldBucketCount = 64 // max addresses in each new address bucket. newBucketSize = 64 // buckets that we spread new addresses over. newBucketCount = 256 // old buckets over which an address group will be spread. oldBucketsPerGroup = 4 // new buckets over which an source address group will be spread. newBucketsPerGroup = 32 // buckets a frequently seen new address may end up in. newBucketsPerAddress = 4 // days before which we assume an address has vanished // if we have not seen it announced in that long. numMissingDays = 30 // tries without a single success before we assume an address is bad. numRetries = 3 // max failures we will accept without a success before considering an address bad. maxFailures = 10 // days since the last success before we will consider evicting an address. minBadDays = 7 // max addresses that we will send in response to a getAddr // (in practise the most addresses we will return from a call to AddressCache()). getAddrMax = 2500 // % of total addresses known that we will share with a call to AddressCache. getAddrPercent = 23 // current version of the on-disk format. serialisationVersion = 1 ) // Use Start to begin processing asynchronous address updates. func NewAddrManager(filePath string) *AddrManager { am := AddrManager{ rand: rand.New(rand.NewSource(time.Now().UnixNano())), quit: make(chan struct{}), localAddresses: make(map[string]*localAddress), filePath: filePath, } am.init() return &am } // When modifying this, don't forget to update loadFromFile() func (a *AddrManager) init() { a.addrIndex = make(map[string]*KnownAddress) io.ReadFull(crand.Reader, a.key[:]) for i := range a.addrNew { a.addrNew[i] = make(map[string]*KnownAddress) } for i := range a.addrOld { a.addrOld[i] = make([]*KnownAddress, 0, oldBucketSize) } } func (a *AddrManager) Start() { if atomic.AddInt32(&a.started, 1) != 1 { return } amgrLog.Trace("Starting address manager") a.loadFromFile(a.filePath) a.wg.Add(1) go a.addressHandler() } func (a *AddrManager) Stop() { if atomic.AddInt32(&a.shutdown, 1) != 1 { return } amgrLog.Infof("Address manager shutting down") close(a.quit) a.wg.Wait() } func (a *AddrManager) AddAddress(addr *NetAddress, src *NetAddress) { a.mtx.Lock(); defer a.mtx.Unlock() a.addAddress(addr, src) } func (a *AddrManager) NeedMoreAddresses() bool { return a.NumAddresses() < needAddressThreshold } func (a *AddrManager) NumAddresses() int { a.mtx.Lock(); defer a.mtx.Unlock() return a.nOld + a.nNew } // Pick a new address to connect to. func (a *AddrManager) PickAddress(class string, newBias int) *KnownAddress { a.mtx.Lock(); defer a.mtx.Unlock() if a.nOld == 0 && a.nNew == 0 { return nil } if newBias > 100 { newBias = 100 } if newBias < 0 { newBias = 0 } // Bias between new and old addresses. oldCorrelation := math.Sqrt(float64(a.nOld)) * (100.0 - float64(newBias)) newCorrelation := math.Sqrt(float64(a.nNew)) * float64(newBias) if (newCorrelation+oldCorrelation)*a.rand.Float64() < oldCorrelation { // pick random Old bucket. var bucket []*KnownAddress = nil for len(bucket) == 0 { bucket = a.addrOld[a.rand.Intn(len(a.addrOld))] } // pick a random ka from bucket. return bucket[a.rand.Intn(len(bucket))] } else { // pick random New bucket. var bucket map[string]*KnownAddress = nil for len(bucket) == 0 { bucket = a.addrNew[a.rand.Intn(len(a.addrNew))] } // pick a random ka from bucket. randIndex := a.rand.Intn(len(bucket)) for _, ka := range bucket { randIndex-- if randIndex == 0 { return ka } } panic("Should not happen") } return nil } func (a *AddrManager) MarkGood(ka *KnownAddress) { a.mtx.Lock(); defer a.mtx.Unlock() ka.MarkAttempt(true) a.moveToOld(ka) } func (a *AddrManager) MarkBad(ka *KnownAddress) { a.mtx.Lock(); defer a.mtx.Unlock() ka.MarkAttempt(false) } /* Loading & Saving */ type addrManagerJSON struct { Key [32]byte AddrNew [newBucketCount]map[string]*KnownAddress AddrOld [oldBucketCount][]*KnownAddress NOld int NNew int } func (a *AddrManager) saveToFile(filePath string) { aJSON := &addrManagerJSON{ Key: a.key, AddrNew: a.addrNew, AddrOld: a.addrOld, NOld: a.nOld, NNew: a.nNew, } w, err := os.Create(filePath) if err != nil { amgrLog.Error("Error opening file: ", filePath, err) return } enc := json.NewEncoder(w) defer w.Close() err = enc.Encode(&aJSON) if err != nil { panic(err) } } func (a *AddrManager) loadFromFile(filePath string) { // If doesn't exist, do nothing. _, err := os.Stat(filePath) if os.IsNotExist(err) { return } r, err := os.Open(filePath) if err != nil { panic(fmt.Errorf("%s error opening file: %v", filePath, err)) } defer r.Close() aJSON := &addrManagerJSON{} dec := json.NewDecoder(r) err = dec.Decode(aJSON) if err != nil { panic(fmt.Errorf("error reading %s: %v", filePath, err)) } // Now we need to initialize 'a'. copy(a.key[:], aJSON.Key[:]) a.addrNew = aJSON.AddrNew for i, oldBucket := range aJSON.AddrOld { copy(a.addrOld[i], oldBucket) } a.nNew = aJSON.NNew a.nOld = aJSON.NOld a.addrIndex = make(map[string]*KnownAddress) for _, newBucket := range a.addrNew { for key, ka := range newBucket { a.addrIndex[key] = ka } } } /* Private methods */ func (a *AddrManager) addressHandler() { dumpAddressTicker := time.NewTicker(dumpAddressInterval) out: for { select { case <-dumpAddressTicker.C: a.saveToFile(a.filePath) case <-a.quit: break out } } dumpAddressTicker.Stop() a.saveToFile(a.filePath) a.wg.Done() amgrLog.Trace("Address handler done") } func (a *AddrManager) addAddress(addr, src *NetAddress) { if !addr.Routable() { return } key := addr.String() ka := a.addrIndex[key] if ka != nil { // Already added if ka.OldBucket != -1 { return } if ka.NewRefs == newBucketsPerAddress { return } // The more entries we have, the less likely we are to add more. factor := int32(2 * ka.NewRefs) if a.rand.Int31n(factor) != 0 { return } } else { ka = NewKnownAddress(addr, src) a.addrIndex[key] = ka a.nNew++ } bucket := a.getNewBucket(addr, src) // Already exists? if _, ok := a.addrNew[bucket][key]; ok { return } // Enforce max addresses. if len(a.addrNew[bucket]) > newBucketSize { amgrLog.Tracef("new bucket is full, expiring old ") a.expireNew(bucket) } // Add to new bucket. ka.NewRefs++ a.addrNew[bucket][key] = ka amgrLog.Tracef("Added new address %s for a total of %d addresses", addr, a.nOld+a.nNew) } // Make space in the new buckets by expiring the really bad entries. // If no bad entries are available we look at a few and remove the oldest. func (a *AddrManager) expireNew(bucket int) { var oldest *KnownAddress for k, v := range a.addrNew[bucket] { // If an entry is bad, throw it away if v.Bad() { amgrLog.Tracef("expiring bad address %v", k) delete(a.addrNew[bucket], k) v.NewRefs-- if v.NewRefs == 0 { a.nNew-- delete(a.addrIndex, k) } return } // or, keep track of the oldest entry if oldest == nil { oldest = v } else if v.LastAttempt < oldest.LastAttempt { oldest = v } } // If we haven't thrown out a bad entry, throw out the oldest entry if oldest != nil { key := oldest.Addr.String() amgrLog.Tracef("expiring oldest address %v", key) delete(a.addrNew[bucket], key) oldest.NewRefs-- if oldest.NewRefs == 0 { a.nNew-- delete(a.addrIndex, key) } } } func (a *AddrManager) moveToOld(ka *KnownAddress) { // Remove from all new buckets. // Remember one of those new buckets. addrKey := ka.Addr.String() freedBucket := -1 for i := range a.addrNew { // we check for existance so we can record the first one if _, ok := a.addrNew[i][addrKey]; ok { delete(a.addrNew[i], addrKey) ka.NewRefs-- if freedBucket == -1 { freedBucket = i } } } a.nNew-- if freedBucket == -1 { panic("Expected to find addr in at least one new bucket") } oldBucket := a.getOldBucket(ka.Addr) // If room in oldBucket, put it in. if len(a.addrOld[oldBucket]) < oldBucketSize { ka.OldBucket = Int16(oldBucket) a.addrOld[oldBucket] = append(a.addrOld[oldBucket], ka) a.nOld++ return } // No room, we have to evict something else. rmkaIndex := a.pickOld(oldBucket) rmka := a.addrOld[oldBucket][rmkaIndex] // Find a new bucket to put rmka in. newBucket := a.getNewBucket(rmka.Addr, rmka.Src) if len(a.addrNew[newBucket]) >= newBucketSize { newBucket = freedBucket } // replace with ka in list. ka.OldBucket = Int16(oldBucket) a.addrOld[oldBucket][rmkaIndex] = ka rmka.OldBucket = -1 // put rmka into new bucket rmkey := rmka.Addr.String() amgrLog.Tracef("Replacing %s with %s in old", rmkey, addrKey) a.addrNew[newBucket][rmkey] = rmka rmka.NewRefs++ a.nNew++ } // Returns the index in old bucket of oldest entry. func (a *AddrManager) pickOld(bucket int) int { var oldest *KnownAddress var oldestIndex int for i, ka := range a.addrOld[bucket] { if oldest == nil || ka.LastAttempt < oldest.LastAttempt { oldest = ka oldestIndex = i } } return oldestIndex } // doublesha256(key + sourcegroup + // int64(doublesha256(key + group + sourcegroup))%bucket_per_source_group) % num_new_buckes func (a *AddrManager) getNewBucket(addr, src *NetAddress) int { data1 := []byte{} data1 = append(data1, a.key[:]...) data1 = append(data1, []byte(GroupKey(addr))...) data1 = append(data1, []byte(GroupKey(src))...) hash1 := DoubleSha256(data1) hash64 := binary.LittleEndian.Uint64(hash1) hash64 %= newBucketsPerGroup var hashbuf [8]byte binary.LittleEndian.PutUint64(hashbuf[:], hash64) data2 := []byte{} data2 = append(data2, a.key[:]...) data2 = append(data2, GroupKey(src)...) data2 = append(data2, hashbuf[:]...) hash2 := DoubleSha256(data2) return int(binary.LittleEndian.Uint64(hash2) % newBucketCount) } // doublesha256(key + group + truncate_to_64bits(doublesha256(key + addr))%buckets_per_group) % num_buckets func (a *AddrManager) getOldBucket(addr *NetAddress) int { data1 := []byte{} data1 = append(data1, a.key[:]...) data1 = append(data1, []byte(addr.String())...) hash1 := DoubleSha256(data1) hash64 := binary.LittleEndian.Uint64(hash1) hash64 %= oldBucketsPerGroup var hashbuf [8]byte binary.LittleEndian.PutUint64(hashbuf[:], hash64) data2 := []byte{} data2 = append(data2, a.key[:]...) data2 = append(data2, GroupKey(addr)...) data2 = append(data2, hashbuf[:]...) hash2 := DoubleSha256(data2) return int(binary.LittleEndian.Uint64(hash2) % oldBucketCount) } ///// LOCAL ADDRESS // addressPrio is an enum type used to describe the heirarchy of local address // discovery methods. type addressPrio int const ( InterfacePrio addressPrio = iota // address of local interface. BoundPrio // Address explicitly bound to. UpnpPrio // External IP discovered from UPnP HttpPrio // Obtained from internet service. ManualPrio // provided by --externalip. ) type localAddress struct { Addr *NetAddress Score addressPrio } // addLocalAddress adds addr to the list of known local addresses to advertise // with the given priority. func (a *AddrManager) addLocalAddress(addr *NetAddress, priority addressPrio) { // sanity check. if !addr.Routable() { amgrLog.Debugf("rejecting address %s:%d due to routability", addr.IP, addr.Port) return } amgrLog.Debugf("adding address %s:%d", addr.IP, addr.Port) key := addr.String() la, ok := a.localAddresses[key] if !ok || la.Score < priority { if ok { la.Score = priority + 1 } else { a.localAddresses[key] = &localAddress{ Addr: addr, Score: priority, } } } } // getBestLocalAddress returns the most appropriate local address that we know // of to be contacted by rna (remote net address) func (a *AddrManager) getBestLocalAddress(rna *NetAddress) *NetAddress { bestReach := 0 var bestScore addressPrio var bestAddr *NetAddress for _, la := range a.localAddresses { reach := rna.ReachabilityTo(la.Addr) if reach > bestReach || (reach == bestReach && la.Score > bestScore) { bestReach = reach bestScore = la.Score bestAddr = la.Addr } } if bestAddr != nil { amgrLog.Debugf("Suggesting address %s:%d for %s:%d", bestAddr.IP, bestAddr.Port, rna.IP, rna.Port) } else { amgrLog.Debugf("No worthy address for %s:%d", rna.IP, rna.Port) // Send something unroutable if nothing suitable. bestAddr = &NetAddress{ IP: net.IP([]byte{0, 0, 0, 0}), Port: 0, } } return bestAddr } // Return a string representing the network group of this address. // This is the /16 for IPv6, the /32 (/36 for he.net) for IPv6, the string // "local" for a local address and the string "unroutable for an unroutable // address. func GroupKey (na *NetAddress) string { if na.Local() { return "local" } if !na.Routable() { return "unroutable" } if ipv4 := na.IP.To4(); ipv4 != nil { return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(16, 32)}).String() } if na.RFC6145() || na.RFC6052() { // last four bytes are the ip address ip := net.IP(na.IP[12:16]) return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String() } if na.RFC3964() { ip := net.IP(na.IP[2:7]) return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String() } if na.RFC4380() { // teredo tunnels have the last 4 bytes as the v4 address XOR // 0xff. ip := net.IP(make([]byte, 4)) for i, byte := range na.IP[12:16] { ip[i] = byte ^ 0xff } return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String() } // OK, so now we know ourselves to be a IPv6 address. // bitcoind uses /32 for everything, except for Hurricane Electric's // (he.net) IP range, which it uses /36 for. bits := 32 heNet := &net.IPNet{IP: net.ParseIP("2001:470::"), Mask: net.CIDRMask(32, 128)} if heNet.Contains(na.IP) { bits = 36 } return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(bits, 128)}).String() }