diff --git a/peer/addrmanager.go b/peer/addrmanager.go new file mode 100644 index 00000000..e837ebd8 --- /dev/null +++ b/peer/addrmanager.go @@ -0,0 +1,496 @@ +// Modified for Tendermint +// Originally Copyright (c) 2013-2014 Conformal Systems LLC. +// https://github.com/conformal/btcd/blob/master/LICENSE + +package peer + +import ( + . "github.com/tendermint/tendermint/binary" + crand "crypto/rand" // for seeding + "encoding/binary" + "io" + "math" + "math/rand" + "net" + "sync" + "sync/atomic" + "time" +) + +/* AddrManager - concurrency safe peer address manager */ +type AddrManager struct { + rand *rand.Rand + key [32]byte + addrIndex map[string]*KnownAddress // addr.String() -> KnownAddress + addrNew [newBucketCount]map[string]*KnownAddress + addrOld [oldBucketCount][]*KnownAddress + started int32 + shutdown int32 + wg sync.WaitGroup + quit chan bool + nOld int + nNew int + localAddresses map[string]*localAddress +} + +const ( + // addresses under which the address manager will claim to need more addresses. + needAddressThreshold = 1000 + + // interval used to dump the address cache to disk for future use. + dumpAddressInterval = time.Minute * 2 + + // max addresses in each old address bucket. + oldBucketSize = 64 + + // buckets we split old addresses over. + oldBucketCount = 64 + + // max addresses in each new address bucket. + newBucketSize = 64 + + // buckets that we spread new addresses over. + newBucketCount = 256 + + // old buckets over which an address group will be spread. + oldBucketsPerGroup = 4 + + // new buckets over which an source address group will be spread. + newBucketsPerGroup = 32 + + // buckets a frequently seen new address may end up in. + newBucketsPerAddress = 4 + + // days before which we assume an address has vanished + // if we have not seen it announced in that long. + numMissingDays = 30 + + // tries without a single success before we assume an address is bad. + numRetries = 3 + + // max failures we will accept without a success before considering an address bad. + maxFailures = 10 + + // days since the last success before we will consider evicting an address. + minBadDays = 7 + + // max addresses that we will send in response to a getAddr + // (in practise the most addresses we will return from a call to AddressCache()). + getAddrMax = 2500 + + // % of total addresses known that we will share with a call to AddressCache. + getAddrPercent = 23 + + // current version of the on-disk format. + serialisationVersion = 1 +) + +// Use Start to begin processing asynchronous address updates. +func NewAddrManager() *AddrManager { + am := AddrManager{ + rand: rand.New(rand.NewSource(time.Now().UnixNano())), + quit: make(chan bool), + localAddresses: make(map[string]*localAddress), + } + am.init() + return &am +} + +func (a *AddrManager) init() { + a.addrIndex = make(map[string]*KnownAddress) + io.ReadFull(crand.Reader, a.key[:]) + for i := range a.addrNew { + a.addrNew[i] = make(map[string]*KnownAddress) + } + for i := range a.addrOld { + a.addrOld[i] = make([]*KnownAddress, 0, oldBucketSize) + } +} + +func (a *AddrManager) Start() { + if atomic.AddInt32(&a.started, 1) != 1 { return } + amgrLog.Trace("Starting address manager") + a.wg.Add(1) + a.loadPeers() + go a.addressHandler() +} + +func (a *AddrManager) Stop() { + if atomic.AddInt32(&a.shutdown, 1) != 1 { return } + amgrLog.Infof("Address manager shutting down") + close(a.quit) + a.wg.Wait() +} + +func (a *AddrManager) AddAddress(addr *NetAddress, src *NetAddress) { + // XXX use a channel for concurrency + a.addAddress(addr, src) +} + +func (a *AddrManager) NeedMoreAddresses() bool { + return a.NumAddresses() < needAddressThreshold +} + +func (a *AddrManager) NumAddresses() int { + return a.nOld + a.nNew +} + +// Pick a new address to connect to. +func (a *AddrManager) PickAddress(class string, newBias int) *KnownAddress { + if a.NumAddresses() == 0 { return nil } + if newBias > 100 { newBias = 100 } + if newBias < 0 { newBias = 0 } + + // Bias between new and old addresses. + oldCorrelation := math.Sqrt(float64(a.nOld)) * (100.0 - float64(newBias)) + newCorrelation := math.Sqrt(float64(a.nNew)) * float64(newBias) + + if (newCorrelation+oldCorrelation)*a.rand.Float64() < oldCorrelation { + // Old entry. + // XXX + } else { + // New entry. + // XXX + } + return nil +} + +func (a *AddrManager) MarkGood(ka *KnownAddress) { + ka.MarkAttempt(true) + a.moveToOld(ka) +} + +/* Loading & Saving */ + +func (a *AddrManager) loadPeers() { +} + +func (a *AddrManager) savePeers() { +} + +/* Private methods */ + +func (a *AddrManager) addressHandler() { + dumpAddressTicker := time.NewTicker(dumpAddressInterval) +out: + for { + select { + case <-dumpAddressTicker.C: + a.savePeers() + case <-a.quit: + break out + } + } + dumpAddressTicker.Stop() + a.savePeers() + a.wg.Done() + amgrLog.Trace("Address handler done") +} + +func (a *AddrManager) addAddress(addr, src *NetAddress) { + if !addr.Routable() { return } + + key := addr.String() + ka := a.addrIndex[key] + + if ka != nil { + // Already added + if ka.OldBucket != -1 { return } + if ka.NewRefs == newBucketsPerAddress { return } + + // The more entries we have, the less likely we are to add more. + factor := int32(2 * ka.NewRefs) + if a.rand.Int31n(factor) != 0 { + return + } + } else { + ka = NewKnownAddress(addr, src) + a.addrIndex[key] = ka + a.nNew++ + } + + bucket := a.getNewBucket(addr, src) + + // Already exists? + if _, ok := a.addrNew[bucket][key]; ok { + return + } + + // Enforce max addresses. + if len(a.addrNew[bucket]) > newBucketSize { + amgrLog.Tracef("new bucket is full, expiring old ") + a.expireNew(bucket) + } + + // Add to new bucket. + ka.NewRefs++ + a.addrNew[bucket][key] = ka + + amgrLog.Tracef("Added new address %s for a total of %d addresses", addr, a.nOld+a.nNew) +} + +// Make space in the new buckets by expiring the really bad entries. +// If no bad entries are available we look at a few and remove the oldest. +func (a *AddrManager) expireNew(bucket int) { + var oldest *KnownAddress + for k, v := range a.addrNew[bucket] { + // If an entry is bad, throw it away + if v.Bad() { + amgrLog.Tracef("expiring bad address %v", k) + delete(a.addrNew[bucket], k) + v.NewRefs-- + if v.NewRefs == 0 { + a.nNew-- + delete(a.addrIndex, k) + } + return + } + // or, keep track of the oldest entry + if oldest == nil { + oldest = v + } else if v.LastAttempt < oldest.LastAttempt { + oldest = v + } + } + + // If we haven't thrown out a bad entry, throw out the oldest entry + if oldest != nil { + key := oldest.Addr.String() + amgrLog.Tracef("expiring oldest address %v", key) + delete(a.addrNew[bucket], key) + oldest.NewRefs-- + if oldest.NewRefs == 0 { + a.nNew-- + delete(a.addrIndex, key) + } + } +} + +func (a *AddrManager) moveToOld(ka *KnownAddress) { + // Remove from all new buckets. + // Remember one of those new buckets. + addrKey := ka.Addr.String() + freedBucket := -1 + for i := range a.addrNew { + // we check for existance so we can record the first one + if _, ok := a.addrNew[i][addrKey]; ok { + delete(a.addrNew[i], addrKey) + ka.NewRefs-- + if freedBucket == -1 { + freedBucket = i + } + } + } + a.nNew-- + if freedBucket == -1 { panic("Expected to find addr in at least one new bucket") } + + oldBucket := a.getOldBucket(ka.Addr) + + // If room in oldBucket, put it in. + if len(a.addrOld[oldBucket]) < oldBucketSize { + ka.OldBucket = Int16(oldBucket) + a.addrOld[oldBucket] = append(a.addrOld[oldBucket], ka) + a.nOld++ + return + } + + // No room, we have to evict something else. + rmkaIndex := a.pickOld(oldBucket) + rmka := a.addrOld[oldBucket][rmkaIndex] + + // Find a new bucket to put rmka in. + newBucket := a.getNewBucket(rmka.Addr, rmka.Src) + if len(a.addrNew[newBucket]) >= newBucketSize { + newBucket = freedBucket + } + + // replace with ka in list. + ka.OldBucket = Int16(oldBucket) + a.addrOld[oldBucket][rmkaIndex] = ka + rmka.OldBucket = -1 + + // put rmka into new bucket + rmkey := rmka.Addr.String() + amgrLog.Tracef("Replacing %s with %s in old", rmkey, addrKey) + a.addrNew[newBucket][rmkey] = rmka + rmka.NewRefs++ + a.nNew++ +} + +// Returns the index in old bucket of oldest entry. +func (a *AddrManager) pickOld(bucket int) int { + var oldest *KnownAddress + var oldestIndex int + for i, ka := range a.addrOld[bucket] { + if oldest == nil || ka.LastAttempt < oldest.LastAttempt { + oldest = ka + oldestIndex = i + } + } + return oldestIndex +} + +// doublesha256(key + sourcegroup + +// int64(doublesha256(key + group + sourcegroup))%bucket_per_source_group) % num_new_buckes +func (a *AddrManager) getNewBucket(addr, src *NetAddress) int { + data1 := []byte{} + data1 = append(data1, a.key[:]...) + data1 = append(data1, []byte(GroupKey(addr))...) + data1 = append(data1, []byte(GroupKey(src))...) + hash1 := DoubleSha256(data1) + hash64 := binary.LittleEndian.Uint64(hash1) + hash64 %= newBucketsPerGroup + var hashbuf [8]byte + binary.LittleEndian.PutUint64(hashbuf[:], hash64) + data2 := []byte{} + data2 = append(data2, a.key[:]...) + data2 = append(data2, GroupKey(src)...) + data2 = append(data2, hashbuf[:]...) + + hash2 := DoubleSha256(data2) + return int(binary.LittleEndian.Uint64(hash2) % newBucketCount) +} + +// doublesha256(key + group + truncate_to_64bits(doublesha256(key + addr))%buckets_per_group) % num_buckets +func (a *AddrManager) getOldBucket(addr *NetAddress) int { + data1 := []byte{} + data1 = append(data1, a.key[:]...) + data1 = append(data1, []byte(addr.String())...) + hash1 := DoubleSha256(data1) + hash64 := binary.LittleEndian.Uint64(hash1) + hash64 %= oldBucketsPerGroup + var hashbuf [8]byte + binary.LittleEndian.PutUint64(hashbuf[:], hash64) + data2 := []byte{} + data2 = append(data2, a.key[:]...) + data2 = append(data2, GroupKey(addr)...) + data2 = append(data2, hashbuf[:]...) + + hash2 := DoubleSha256(data2) + return int(binary.LittleEndian.Uint64(hash2) % oldBucketCount) +} + + +///// LOCAL ADDRESS + +// addressPrio is an enum type used to describe the heirarchy of local address +// discovery methods. +type addressPrio int + +const ( + InterfacePrio addressPrio = iota // address of local interface. + BoundPrio // Address explicitly bound to. + UpnpPrio // External IP discovered from UPnP + HttpPrio // Obtained from internet service. + ManualPrio // provided by --externalip. +) + +type localAddress struct { + Addr *NetAddress + Score addressPrio +} + +// addLocalAddress adds addr to the list of known local addresses to advertise +// with the given priority. +func (a *AddrManager) addLocalAddress(addr *NetAddress, priority addressPrio) { + // sanity check. + if !addr.Routable() { + amgrLog.Debugf("rejecting address %s:%d due to routability", addr.IP, addr.Port) + return + } + amgrLog.Debugf("adding address %s:%d", addr.IP, addr.Port) + + key := addr.String() + la, ok := a.localAddresses[key] + if !ok || la.Score < priority { + if ok { + la.Score = priority + 1 + } else { + a.localAddresses[key] = &localAddress{ + Addr: addr, + Score: priority, + } + } + } +} + +// getBestLocalAddress returns the most appropriate local address that we know +// of to be contacted by rna (remote net address) +func (a *AddrManager) getBestLocalAddress(rna *NetAddress) *NetAddress { + bestReach := 0 + var bestScore addressPrio + var bestAddr *NetAddress + for _, la := range a.localAddresses { + reach := rna.ReachabilityTo(la.Addr) + if reach > bestReach || + (reach == bestReach && la.Score > bestScore) { + bestReach = reach + bestScore = la.Score + bestAddr = la.Addr + } + } + if bestAddr != nil { + amgrLog.Debugf("Suggesting address %s:%d for %s:%d", + bestAddr.IP, bestAddr.Port, rna.IP, rna.Port) + } else { + amgrLog.Debugf("No worthy address for %s:%d", + rna.IP, rna.Port) + // Send something unroutable if nothing suitable. + bestAddr = &NetAddress{ + IP: net.IP([]byte{0, 0, 0, 0}), + Port: 0, + } + } + + return bestAddr +} + + +// Return a string representing the network group of this address. +// This is the /16 for IPv6, the /32 (/36 for he.net) for IPv6, the string +// "local" for a local address and the string "unroutable for an unroutable +// address. +func GroupKey (na *NetAddress) string { + if na.Local() { + return "local" + } + if !na.Routable() { + return "unroutable" + } + + if ipv4 := na.IP.To4(); ipv4 != nil { + return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(16, 32)}).String() + } + if na.RFC6145() || na.RFC6052() { + // last four bytes are the ip address + ip := net.IP(na.IP[12:16]) + return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String() + } + + if na.RFC3964() { + ip := net.IP(na.IP[2:7]) + return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String() + + } + if na.RFC4380() { + // teredo tunnels have the last 4 bytes as the v4 address XOR + // 0xff. + ip := net.IP(make([]byte, 4)) + for i, byte := range na.IP[12:16] { + ip[i] = byte ^ 0xff + } + return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String() + } + + // OK, so now we know ourselves to be a IPv6 address. + // bitcoind uses /32 for everything, except for Hurricane Electric's + // (he.net) IP range, which it uses /36 for. + bits := 32 + heNet := &net.IPNet{IP: net.ParseIP("2001:470::"), + Mask: net.CIDRMask(32, 128)} + if heNet.Contains(na.IP) { + bits = 36 + } + + return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(bits, 128)}).String() +} diff --git a/peer/knownaddress.go b/peer/knownaddress.go new file mode 100644 index 00000000..32685587 --- /dev/null +++ b/peer/knownaddress.go @@ -0,0 +1,80 @@ +package peer + +import ( + . "github.com/tendermint/tendermint/binary" + "time" +) + +/* + KnownAddress + + tracks information about a known network address that is used + to determine how viable an address is. +*/ +type KnownAddress struct { + Addr *NetAddress + Src *NetAddress + Attempts UInt32 + LastAttempt UInt64 + LastSuccess UInt64 + NewRefs UInt16 + OldBucket Int16 // TODO init to -1 +} + +func NewKnownAddress(addr *NetAddress, src *NetAddress) *KnownAddress { + return &KnownAddress{ + Addr: addr, + Src: src, + OldBucket: -1, + LastAttempt: UInt64(time.Now().Unix()), + Attempts: 0, + } +} + +func (ka *KnownAddress) MarkAttempt(success bool) { + now := UInt64(time.Now().Unix()) + ka.LastAttempt = now + if success { + ka.LastSuccess = now + ka.Attempts = 0 + } else { + ka.Attempts += 1 + } +} + +/* + An address is bad if the address in question has not been tried in the last + minute and meets one of the following criteria: + + 1) It claims to be from the future + 2) It hasn't been seen in over a month + 3) It has failed at least three times and never succeeded + 4) It has failed ten times in the last week + + All addresses that meet these criteria are assumed to be worthless and not + worth keeping hold of. +*/ +func (ka *KnownAddress) Bad() bool { + // Has been attempted in the last minute --> good + if ka.LastAttempt < UInt64(time.Now().Add(-1 * time.Minute).Unix()) { + return false + } + + // Over a month old? + if ka.LastAttempt > UInt64(time.Now().Add(-1 * numMissingDays * time.Hour * 24).Unix()) { + return true + } + + // Never succeeded? + if ka.LastSuccess == 0 && ka.Attempts >= numRetries { + return true + } + + // Hasn't succeeded in too long? + if ka.LastSuccess < UInt64(time.Now().Add(-1*minBadDays*time.Hour*24).Unix()) && + ka.Attempts >= maxFailures { + return true + } + + return false +} diff --git a/peer/log.go b/peer/log.go new file mode 100644 index 00000000..07b2ed2b --- /dev/null +++ b/peer/log.go @@ -0,0 +1,7 @@ +package peer + +import ( + "github.com/tendermint/btclog" +) + +var amgrLog = btclog.Disabled diff --git a/peer/netaddress.go b/peer/netaddress.go new file mode 100644 index 00000000..a5c42c49 --- /dev/null +++ b/peer/netaddress.go @@ -0,0 +1,155 @@ +// Modified for Tendermint +// Originally Copyright (c) 2013-2014 Conformal Systems LLC. +// https://github.com/conformal/btcd/blob/master/LICENSE + +package peer + +import ( + . "github.com/tendermint/tendermint/binary" + "io" + "net" + "strconv" +) + +/* NetAddress */ + +type NetAddress struct { + IP net.IP + Port UInt16 +} + +func NewNetAddressIPPort(ip net.IP, port UInt16) *NetAddress { + na := NetAddress{ + IP: ip, + Port: port, + } + return &na +} + +func NewNetAddress(addr net.Addr) *NetAddress { + tcpAddr, ok := addr.(*net.TCPAddr) + if !ok { panic("addr is not a net.TCPAddr") } + + na := NewNetAddressIPPort(tcpAddr.IP, UInt16(tcpAddr.Port)) + return na +} + +func ReadNetAddress(r io.Reader) *NetAddress { + return &NetAddress{ + IP: net.IP(ReadByteSlice(r)), + Port: ReadUInt16(r), + } +} + +func (na *NetAddress) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteOnto(ByteSlice(na.IP.To16()), w, n, err) + n, err = WriteOnto(na.Port, w, n, err) + return +} + +func (na *NetAddress) String() string { + port := strconv.FormatUint(uint64(na.Port), 10) + addr := net.JoinHostPort(na.IP.String(), port) + return addr +} + +func (na *NetAddress) Routable() bool { + // TODO(oga) bitcoind doesn't include RFC3849 here, but should we? + return na.Valid() && !(na.RFC1918() || na.RFC3927() || na.RFC4862() || + na.RFC4193() || na.RFC4843() || na.Local()) +} + +// For IPv4 these are either a 0 or all bits set address. For IPv6 a zero +// address or one that matches the RFC3849 documentation address format. +func (na *NetAddress) Valid() bool { + return na.IP != nil && !(na.IP.IsUnspecified() || na.RFC3849() || + na.IP.Equal(net.IPv4bcast)) +} + +func (na *NetAddress) Local() bool { + return na.IP.IsLoopback() || zero4.Contains(na.IP) +} + +func (na *NetAddress) ReachabilityTo(o *NetAddress) int { + const ( + Unreachable = 0 + Default = iota + Teredo + Ipv6_weak + Ipv4 + Ipv6_strong + Private + ) + if !na.Routable() { + return Unreachable + } else if na.RFC4380() { + if !o.Routable() { + return Default + } else if o.RFC4380() { + return Teredo + } else if o.IP.To4() != nil { + return Ipv4 + } else { // ipv6 + return Ipv6_weak + } + } else if na.IP.To4() != nil { + if o.Routable() && o.IP.To4() != nil { + return Ipv4 + } + return Default + } else /* ipv6 */ { + var tunnelled bool + // Is our v6 is tunnelled? + if o.RFC3964() || o.RFC6052() || o.RFC6145() { + tunnelled = true + } + if !o.Routable() { + return Default + } else if o.RFC4380() { + return Teredo + } else if o.IP.To4() != nil { + return Ipv4 + } else if tunnelled { + // only prioritise ipv6 if we aren't tunnelling it. + return Ipv6_weak + } + return Ipv6_strong + } +} + +// RFC1918: IPv4 Private networks (10.0.0.0/8, 192.168.0.0/16, 172.16.0.0/12) +// RFC3849: IPv6 Documentation address (2001:0DB8::/32) +// RFC3927: IPv4 Autoconfig (169.254.0.0/16) +// RFC3964: IPv6 6to4 (2002::/16) +// RFC4193: IPv6 unique local (FC00::/7) +// RFC4380: IPv6 Teredo tunneling (2001::/32) +// RFC4843: IPv6 ORCHID: (2001:10::/28) +// RFC4862: IPv6 Autoconfig (FE80::/64) +// RFC6052: IPv6 well known prefix (64:FF9B::/96) +// RFC6145: IPv6 IPv4 translated address ::FFFF:0:0:0/96 +var rfc1918_10 = net.IPNet{IP: net.ParseIP("10.0.0.0"), Mask: net.CIDRMask(8, 32)} +var rfc1918_192 = net.IPNet{IP: net.ParseIP("192.168.0.0"), Mask: net.CIDRMask(16, 32)} +var rfc1918_172 = net.IPNet{IP: net.ParseIP("172.16.0.0"), Mask: net.CIDRMask(12, 32)} +var rfc3849 = net.IPNet{IP: net.ParseIP("2001:0DB8::"), Mask: net.CIDRMask(32, 128)} +var rfc3927 = net.IPNet{IP: net.ParseIP("169.254.0.0"), Mask: net.CIDRMask(16, 32)} +var rfc3964 = net.IPNet{IP: net.ParseIP("2002::"), Mask: net.CIDRMask(16, 128)} +var rfc4193 = net.IPNet{IP: net.ParseIP("FC00::"), Mask: net.CIDRMask(7, 128)} +var rfc4380 = net.IPNet{IP: net.ParseIP("2001::"), Mask: net.CIDRMask(32, 128)} +var rfc4843 = net.IPNet{IP: net.ParseIP("2001:10::"), Mask: net.CIDRMask(28, 128)} +var rfc4862 = net.IPNet{IP: net.ParseIP("FE80::"), Mask: net.CIDRMask(64, 128)} +var rfc6052 = net.IPNet{IP: net.ParseIP("64:FF9B::"), Mask: net.CIDRMask(96, 128)} +var rfc6145 = net.IPNet{IP: net.ParseIP("::FFFF:0:0:0"), Mask: net.CIDRMask(96, 128)} +var zero4 = net.IPNet{IP: net.ParseIP("0.0.0.0"), Mask: net.CIDRMask(8, 32)} + +func (na *NetAddress) RFC1918() bool { return rfc1918_10.Contains(na.IP) || + rfc1918_192.Contains(na.IP) || + rfc1918_172.Contains(na.IP) } +func (na *NetAddress) RFC3849() bool { return rfc3849.Contains(na.IP) } +func (na *NetAddress) RFC3927() bool { return rfc3927.Contains(na.IP) } +func (na *NetAddress) RFC3964() bool { return rfc3964.Contains(na.IP) } +func (na *NetAddress) RFC4193() bool { return rfc4193.Contains(na.IP) } +func (na *NetAddress) RFC4380() bool { return rfc4380.Contains(na.IP) } +func (na *NetAddress) RFC4843() bool { return rfc4843.Contains(na.IP) } +func (na *NetAddress) RFC4862() bool { return rfc4862.Contains(na.IP) } +func (na *NetAddress) RFC6052() bool { return rfc6052.Contains(na.IP) } +func (na *NetAddress) RFC6145() bool { return rfc6145.Contains(na.IP) } diff --git a/peer/util.go b/peer/util.go new file mode 100644 index 00000000..e4d7fbfb --- /dev/null +++ b/peer/util.go @@ -0,0 +1,15 @@ +package peer + +import ( + "crypto/sha256" +) + +// DoubleSha256 calculates sha256(sha256(b)) and returns the resulting bytes. +func DoubleSha256(b []byte) []byte { + hasher := sha256.New() + hasher.Write(b) + sum := hasher.Sum(nil) + hasher.Reset() + hasher.Write(sum) + return hasher.Sum(nil) +}