working multitau

This commit is contained in:
Pieter Wuille 2011-12-23 02:43:32 +01:00
parent 8286cf33f1
commit 8b2c4eca5f
6 changed files with 168 additions and 121 deletions

View File

@ -19,7 +19,6 @@ class CNode {
unsigned int nHeaderStart;
unsigned int nMessageStart;
int nVersion;
int nRecvVersion;
string strSubVer;
int nStartingHeight;
vector<CAddress> *vAddr;
@ -82,11 +81,6 @@ class CNode {
}
void GotVersion() {
if (nVersion < MIN_VERSION) {
printf("%s: BAD (version %i is below %i)\n", ToString(you).c_str(), nVersion, MIN_VERSION);
ban = 1000000;
return;
}
printf("%s: version %i\n", ToString(you).c_str(), nVersion);
BeginMessage("getaddr");
EndMessage();
@ -109,12 +103,6 @@ class CNode {
if (nVersion >= 209 && !vRecv.empty())
vRecv >> nStartingHeight;
if (!(you.nServices & NODE_NETWORK)) {
printf("%s: BAD (no NODE_NETWORK)\n", ToString(you).c_str());
ban = 1000000;
return true;
}
if (nVersion >= 209) {
BeginMessage("verack");
EndMessage();
@ -204,7 +192,7 @@ class CNode {
}
public:
CNode(const CIPPort& ip, vector<CAddress>& vAddrIn) : you(ip), nHeaderStart(-1), nMessageStart(-1), vAddr(&vAddrIn), ban(0), doneAfter(0) {
CNode(const CIPPort& ip, vector<CAddress>& vAddrIn) : you(ip), nHeaderStart(-1), nMessageStart(-1), vAddr(&vAddrIn), ban(0), doneAfter(0), nVersion(0) {
vSend.SetType(SER_NETWORK);
vSend.SetVersion(0);
vRecv.SetType(SER_NETWORK);
@ -264,9 +252,13 @@ public:
int GetBan() {
return ban;
}
int GetClientVersion() {
return nVersion;
}
};
bool TestNode(const CIPPort &cip, int &ban, vector<CAddress>& vAddr) {
bool TestNode(const CIPPort &cip, int &ban, int &clientV, vector<CAddress>& vAddr) {
CNode node(cip, vAddr);
bool ret = node.Run();
if (!ret) {
@ -274,6 +266,7 @@ bool TestNode(const CIPPort &cip, int &ban, vector<CAddress>& vAddr) {
} else {
ban = 0;
}
clientV = node.GetClientVersion();
// printf("%s: %s!!!\n", cip.ToString().c_str(), ret ? "GOOD" : "BAD");
return ret;
}

View File

@ -3,6 +3,6 @@
#include "protocol.h"
bool TestNode(const CIPPort &cip, int &ban, std::vector<CAddress>& vAddr);
bool TestNode(const CIPPort &cip, int &ban, int &client, std::vector<CAddress>& vAddr);
#endif

113
db.cpp
View File

@ -4,48 +4,75 @@
using namespace std;
void CAddrInfo::Update(bool good) {
uint32_t now = time(NULL);
if (ourLastTry == 0)
ourLastTry = now - MIN_RETRY;
lastTry = now;
ourLastTry = now;
total++;
if (good) success++;
printf("%s: got %s result: weight=%g reliability=%g avgage=%g count=%g success=%i/%i\n", ToString(ip).c_str(), good ? "good" : "bad", weight, GetReliability(), GetAvgAge(), GetCount(), success, total);
uint32_t now = time(NULL);
if (ourLastTry == 0)
ourLastTry = now - MIN_RETRY;
int age = now - ourLastTry;
lastTry = now;
ourLastTry = now;
total++;
if (good) success++;
stat2H.Update(good, age, 3600*2);
stat8H.Update(good, age, 3600*8);
stat1D.Update(good, age, 3600*24);
stat1W.Update(good, age, 3600*24*7);
int ign = GetIgnoreTime();
if (ign && (ignoreTill==0 || ignoreTill < ign+now)) ignoreTill = ign+now;
printf("%s: got %s result: success=%i/%i; 2H:%.2f%%-%.2f%%(%.2f) 8H:%.2f%%-%.2f%%(%.2f) 1D:%.2f%%-%.2f%%(%.2f) 1W:%.2f%%-%.2f%%(%.2f) \n", ToString(ip).c_str(), good ? "good" : "bad", success, total,
100.0 * stat2H.reliability, 100.0 * (stat2H.reliability + 1.0 - stat2H.weight), stat2H.count,
100.0 * stat8H.reliability, 100.0 * (stat8H.reliability + 1.0 - stat8H.weight), stat8H.count,
100.0 * stat1D.reliability, 100.0 * (stat1D.reliability + 1.0 - stat1D.weight), stat1D.count,
100.0 * stat1W.reliability, 100.0 * (stat1W.reliability + 1.0 - stat1W.weight), stat1W.count);
}
bool CAddrDb::Get_(CIPPort &ip, int &wait) {
int64 now = time(NULL);
int cont = 0;
int tot = unkId.size();
deque<int>::iterator it = ourId.begin();
while (it < ourId.end()) {
if (now - idToInfo[*it].ourLastTry > MIN_RETRY) {
tot++;
it++;
do {
deque<int>::iterator it = ourId.begin();
while (it < ourId.end()) {
if (now - idToInfo[*it].ourLastTry > MIN_RETRY) {
tot++;
it++;
} else {
break;
}
}
if (tot == 0) {
if (ourId.size() > 0) {
wait = MIN_RETRY - (now - idToInfo[ourId.front()].ourLastTry);
} else {
wait = 5;
}
return false;
}
int rnd = rand() % tot;
int ret;
if (rnd < unkId.size()) {
if (rnd*10 < unkId.size()) {
// once every 10 attempts, restart with the oldest unknown IP
set<int>::iterator it = unkId.begin();
ret = *it;
} else {
// 90% of the time try the last learned IP
set<int>::reverse_iterator it = unkId.rbegin();
ret = *it;
}
unkId.erase(ret);
} else {
int ret = ourId.front();
if (time(NULL) - idToInfo[ret].ourLastTry < MIN_RETRY) return false;
ourId.pop_front();
}
if (idToInfo[ret].ignoreTill && idToInfo[ret].ignoreTill < now) {
ourId.push_back(ret);
} else {
ip = idToInfo[ret].ip;
break;
}
}
if (tot == 0) {
if (ourId.size() > 0) {
wait = MIN_RETRY - (now - idToInfo[ourId.front()].ourLastTry);
}
return false;
}
int rnd = rand() % tot;
if (rnd < unkId.size()) {
set<int>::reverse_iterator it = unkId.rbegin();
ip = idToInfo[*it].ip;
unkId.erase(*it);
printf("%s: new node\n", ToString(ip).c_str());
} else {
int ret = ourId.front();
if (time(NULL) - idToInfo[ret].ourLastTry < MIN_RETRY) return false;
ourId.pop_front();
ip = idToInfo[ret].ip;
printf("%s: old node\n", ToString(ip).c_str());
}
fDirty = true;
} while(1);
nDirty++;
return true;
}
@ -55,18 +82,19 @@ int CAddrDb::Lookup_(const CIPPort &ip) {
return -1;
}
void CAddrDb::Good_(const CIPPort &addr) {
void CAddrDb::Good_(const CIPPort &addr, int clientV) {
int id = Lookup_(addr);
if (id == -1) return;
unkId.erase(id);
banned.erase(addr);
CAddrInfo &info = idToInfo[id];
info.clientVersion = clientV;
info.Update(true);
if (info.IsGood() && goodId.count(id)==0) {
goodId.insert(id);
printf("%s: good; %i good nodes now\n", ToString(addr).c_str(), (int)goodId.size());
}
fDirty = true;
nDirty++;
ourId.push_back(id);
}
@ -78,9 +106,10 @@ void CAddrDb::Bad_(const CIPPort &addr, int ban)
CAddrInfo &info = idToInfo[id];
info.Update(false);
uint32_t now = time(NULL);
if (info.IsTerrible()) {
int ter = info.GetBanTime();
if (ter) {
printf("%s: terrible\n", ToString(addr).c_str());
if (ban < 604800) ban = 604800;
if (ban < ter) ban = ter;
}
if (ban > 0) {
printf("%s: ban for %i seconds\n", ToString(addr).c_str(), ban);
@ -95,7 +124,7 @@ void CAddrDb::Bad_(const CIPPort &addr, int ban)
}
ourId.push_back(id);
}
fDirty = true;
nDirty++;
}
void CAddrDb::Skipped_(const CIPPort &addr)
@ -105,7 +134,7 @@ void CAddrDb::Skipped_(const CIPPort &addr)
unkId.erase(id);
ourId.push_back(id);
printf("%s: skipped\n", ToString(addr).c_str());
fDirty = true;
nDirty++;
}
@ -135,8 +164,6 @@ void CAddrDb::Add_(const CAddress &addr) {
ai.services = addr.nServices;
ai.lastTry = addr.nTime;
ai.ourLastTry = 0;
ai.reliability = 0;
ai.weight = 0;
ai.total = 0;
ai.success = 0;
int id = nId++;
@ -144,7 +171,7 @@ void CAddrDb::Add_(const CAddress &addr) {
ipToId[ipp] = id;
printf("%s: added\n", ToString(ipp).c_str(), ipToId[ipp]);
unkId.insert(id);
fDirty = true;
nDirty++;
}
void CAddrDb::GetIPs_(set<CIP>& ips, int max, bool fOnlyIPv4) {

140
db.h
View File

@ -10,7 +10,6 @@
#include "protocol.h"
#include "util.h"
#define TAU 86400.0
#define MIN_RETRY 1000
std::string static inline ToString(const CIPPort &ip) {
@ -19,17 +18,29 @@ std::string static inline ToString(const CIPPort &ip) {
return str;
}
template<float tau> class CAddrStat {
class CAddrStat {
private:
float reliability;
float timing;
float count;
float weight;
float count;
float reliability;
public:
void Update(bool good, int64 tim) {
CAddrStat() : weight(0), count(0), reliability(0) {}
void Update(bool good, int64 age, double tau) {
double f = exp(-age/tau);
reliability = reliability * f + (good ? (1.0-f) : 0);
count = count * f + 1;
weight = weight * f + (1.0-f);
}
}
IMPLEMENT_SERIALIZE (
READWRITE(weight);
READWRITE(count);
READWRITE(reliability);
)
friend class CAddrInfo;
};
class CAddrInfo {
private:
@ -37,39 +48,67 @@ private:
uint64_t services;
int64 lastTry;
int64 ourLastTry;
double reliability;
double timing;
double weight;
double count;
int64 ignoreTill;
CAddrStat stat2H;
CAddrStat stat8H;
CAddrStat stat1D;
CAddrStat stat1W;
int clientVersion;
int total;
int success;
public:
double GetCount() const { return count; }
double GetAvgAge() const { return timing/weight; }
double GetReliability() const { return reliability/weight; }
CAddrInfo() : services(0), lastTry(0), ourLastTry(0), ignoreTill(0), clientVersion(0), total(0), success(0) {}
bool IsGood() {
return (weight > 0 && GetReliability() > 0.8 && GetAvgAge() < 86400 && ip.GetPort() == 8333 && ip.IsRoutable());
if (ip.GetPort() != 8333) return false;
if (!(services & NODE_NETWORK)) return false;
if (!ip.IsRoutable()) return false;
if (!ip.IsIPv4()) return false;
if (clientVersion && clientVersion < 32400) return false;
if (total <= 3 && success * 2 >= total) return true;
if (stat2H.reliability > 0.7 && stat2H.count > 3) return true;
if (stat8H.reliability > 0.6 && stat8H.count > 6) return true;
if (stat1D.reliability > 0.5 && stat1D.count > 12) return true;
if (stat1W.reliability > 0.4 && stat1W.count > 24) return true;
return false;
}
bool IsTerrible() {
return ((weight > 0.1 && GetCount() > 5 && GetReliability() < 0.05) || (weight > 0.5 && GetReliability() < 0.2 && GetAvgAge() > 7200 && GetCount() > 5));
int GetBanTime() {
if (IsGood()) return 0;
if (clientVersion && clientVersion < 31900) { return 1000000; }
if (stat1D.reliability < 0.01 && stat1D.count > 5) { return 500000; }
if (stat1W.reliability - stat1W.weight + 1.0 < 0.10 && stat1W.count > 4) { return 240*3600; }
return 0;
}
int GetIgnoreTime() {
if (IsGood()) return 0;
if (stat2H.reliability - stat2H.weight + 1.0 < 0.2 && stat2H.count > 3) { return 3*3600; }
if (stat8H.reliability - stat8H.weight + 1.0 < 0.2 && stat8H.count > 6) { return 12*3600; }
if (stat1D.reliability - stat1D.weight + 1.0 < 0.2 && stat1D.count > 9) { return 36*3600; }
return 0;
}
void Update(bool good);
friend class CAddrDb;
IMPLEMENT_SERIALIZE (
int version = 0;
unsigned char version = 0;
READWRITE(version);
READWRITE(ip);
READWRITE(services);
READWRITE(lastTry);
READWRITE(ourLastTry);
READWRITE(reliability);
READWRITE(timing);
READWRITE(weight);
READWRITE(count);
READWRITE(ignoreTill);
READWRITE(stat2H);
READWRITE(stat8H);
READWRITE(stat1D);
READWRITE(stat1W);
READWRITE(total);
READWRITE(success);
READWRITE(clientVersion);
)
};
@ -91,13 +130,13 @@ private:
std::set<int> unkId; // set of nodes not yet tried (b)
std::set<int> goodId; // set of good nodes (d, good e)
std::map<CIPPort, time_t> banned; // nodes that are banned, with their unban time (a)
bool fDirty;
int nDirty;
protected:
// internal routines that assume proper locks are acquired
void Add_(const CAddress &addr); // add an address
bool Get_(CIPPort &ip, int& wait); // get an IP to test (must call Good_, Bad_, or Skipped_ on result afterwards)
void Good_(const CIPPort &ip); // mark an IP as good (must have been returned by Get_)
void Good_(const CIPPort &ip, int clientV); // mark an IP as good (must have been returned by Get_)
void Bad_(const CIPPort &ip, int ban); // mark an IP as bad (and optionally ban it) (must have been returned by Get_)
void Skipped_(const CIPPort &ip); // mark an IP as skipped (must have been returned by Get_)
int Lookup_(const CIPPort &ip); // look up id of an IP
@ -105,13 +144,11 @@ protected:
public:
// seriazlization code
// serialization code
// format:
// nVersion (0 for now)
// nOur (number of ips in (c,d))
// nUnk (number of ips in (b))
// CAddrInfo[nOur]
// CAddrInfo[nUnk]
// n (number of ips in (b,c,d))
// CAddrInfo[n]
// banned
// acquires a shared lock (this does not suffice for read mode, but we assume that only happens at startup, single-threaded)
// this way, dumping does not interfere with GetIPs_, which is called from the DNS thread
@ -121,10 +158,8 @@ public:
SHARED_CRITICAL_BLOCK(cs) {
if (fWrite) {
CAddrDb *db = const_cast<CAddrDb*>(this);
int nOur = ourId.size();
int nUnk = unkId.size();
READWRITE(nOur);
READWRITE(nUnk);
int n = ourId.size() + unkId.size();
READWRITE(n);
for (std::deque<int>::const_iterator it = ourId.begin(); it != ourId.end(); it++) {
std::map<int, CAddrInfo>::iterator ci = db->idToInfo.find(*it);
READWRITE((*ci).second);
@ -136,31 +171,24 @@ public:
} else {
CAddrDb *db = const_cast<CAddrDb*>(this);
db->nId = 0;
int nOur, nUnk;
READWRITE(nOur);
READWRITE(nUnk);
for (int i=0; i<nOur; i++) {
int n;
READWRITE(n);
for (int i=0; i<n; i++) {
CAddrInfo info;
READWRITE(info);
if (!info.IsTerrible()) {
if (!info.GetBanTime()) {
int id = db->nId++;
db->idToInfo[id] = info;
db->ipToId[info.ip] = id;
db->ourId.push_back(id);
if (info.IsGood()) db->goodId.insert(id);
if (info.ourLastTry) {
db->ourId.push_back(id);
if (info.IsGood()) db->goodId.insert(id);
} else {
db->unkId.insert(id);
}
}
}
for (int i=0; i<nUnk; i++) {
CAddrInfo info;
READWRITE(info);
if (!info.IsTerrible()) {
int id = db->nId++;
db->idToInfo[id] = info;
db->ipToId[info.ip] = id;
db->unkId.insert(id);
}
}
db->fDirty = true;
db->nDirty++;
}
READWRITE(banned);
}
@ -169,7 +197,7 @@ public:
// print statistics
void Stats() {
SHARED_CRITICAL_BLOCK(cs) {
if (fDirty) {
if (nDirty > 50) {
printf("**** %i available (%i tracked, %i new, %i active), %i banned; %i good\n",
(int)idToInfo.size(),
(int)ourId.size(),
@ -177,7 +205,7 @@ public:
(int)idToInfo.size() - (int)ourId.size() - (int)unkId.size(),
(int)banned.size(),
(int)goodId.size());
fDirty = false; // hopefully atomic
nDirty = 0; // hopefully atomic
}
}
}
@ -190,9 +218,9 @@ public:
for (int i=0; i<vAddr.size(); i++)
Add_(vAddr[i]);
}
void Good(const CIPPort &addr) {
void Good(const CIPPort &addr, int clientVersion) {
CRITICAL_BLOCK(cs)
Good_(addr);
Good_(addr, clientVersion);
}
void Skipped(const CIPPort &addr) {
CRITICAL_BLOCK(cs)

View File

@ -3,7 +3,7 @@
#include "bitcoin.h"
#include "db.h"
#define NTHREADS 100
#define NTHREADS 16
using namespace std;
@ -26,10 +26,11 @@ extern "C" void* ThreadCrawler(void* data) {
}
int ban = 0;
vector<CAddress> addr;
bool ret = TestNode(ip,ban,addr);
int clientV = 0;
bool ret = TestNode(ip,ban,clientV,addr);
db.Add(addr);
if (ret) {
db.Good(ip);
db.Good(ip, clientV);
} else {
db.Bad(ip, ban);
}
@ -81,8 +82,6 @@ extern "C" void* ThreadDumper(void*) {
} while(1);
}
#define NTHREADS 100
int main(void) {
FILE *f = fopen("dnsseed.dat","r");
if (f) {

View File

@ -224,7 +224,7 @@ bool CIPPort::ConnectSocket(SOCKET& hSocketRet, int nTimeout) const
}
if (nRet == SOCKET_ERROR)
{
printf("select() for connection failed: %i\n",WSAGetLastError());
printf("select() for connection failed: %s\n",strerror(WSAGetLastError()));
closesocket(hSocket);
return false;
}
@ -235,7 +235,7 @@ bool CIPPort::ConnectSocket(SOCKET& hSocketRet, int nTimeout) const
if (getsockopt(hSocket, SOL_SOCKET, SO_ERROR, &nRet, &nRetSize) == SOCKET_ERROR)
#endif
{
printf("getsockopt() for connection failed: %i\n",WSAGetLastError());
printf("getsockopt() for connection failed: %s\n",strerror(WSAGetLastError()));
closesocket(hSocket);
return false;
}