eth/downloader: fix priority queue reset, add throttling test

This commit is contained in:
Péter Szilágyi 2015-05-07 14:40:50 +03:00
parent 45f8304f3c
commit 43901c9282
6 changed files with 111 additions and 21 deletions

2
Godeps/Godeps.json generated
View File

@ -100,7 +100,7 @@
}, },
{ {
"ImportPath": "gopkg.in/karalabe/cookiejar.v2/collections/prque", "ImportPath": "gopkg.in/karalabe/cookiejar.v2/collections/prque",
"Rev": "cf5d8079df7c4501217638e1e3a6e43f94822548" "Rev": "0b2e270613f5d7ba262a5749b9e32270131497a2"
}, },
{ {
"ImportPath": "gopkg.in/qml.v1/cdata", "ImportPath": "gopkg.in/qml.v1/cdata",

View File

@ -71,5 +71,5 @@ func (p *Prque) Size() int {
// Clears the contents of the priority queue. // Clears the contents of the priority queue.
func (p *Prque) Reset() { func (p *Prque) Reset() {
p.cont.Reset() *p = *New()
} }

View File

@ -61,16 +61,45 @@ func TestPrque(t *testing.T) {
} }
func TestReset(t *testing.T) { func TestReset(t *testing.T) {
// Fill the queue with some random data // Generate a batch of random data and a specific priority order
size := 16 * blockSize size := 16 * blockSize
queue := New() prio := rand.Perm(size)
data := make([]int, size)
for i := 0; i < size; i++ { for i := 0; i < size; i++ {
queue.Push(rand.Int(), rand.Float32()) data[i] = rand.Int()
} }
// Reset and ensure it's empty queue := New()
queue.Reset() for rep := 0; rep < 2; rep++ {
if !queue.Empty() { // Fill a priority queue with the above data
t.Errorf("priority queue not empty after reset: %v", queue) for i := 0; i < size; i++ {
queue.Push(data[i], float32(prio[i]))
if queue.Size() != i+1 {
t.Errorf("queue size mismatch: have %v, want %v.", queue.Size(), i+1)
}
}
// Create a map the values to the priorities for easier verification
dict := make(map[float32]int)
for i := 0; i < size; i++ {
dict[float32(prio[i])] = data[i]
}
// Pop out half the elements in priority order and verify them
prevPrio := float32(size + 1)
for i := 0; i < size/2; i++ {
val, prio := queue.Pop()
if prio > prevPrio {
t.Errorf("invalid priority order: %v after %v.", prio, prevPrio)
}
prevPrio = prio
if val != dict[prio] {
t.Errorf("push/pop mismatch: have %v, want %v.", val, dict[prio])
}
delete(dict, prio)
}
// Reset and ensure it's empty
queue.Reset()
if !queue.Empty() {
t.Errorf("priority queue not empty after reset: %v", queue)
}
} }
} }

View File

@ -88,7 +88,7 @@ func (s *sstack) Less(i, j int) bool {
return s.blocks[i/blockSize][i%blockSize].priority > s.blocks[j/blockSize][j%blockSize].priority return s.blocks[i/blockSize][i%blockSize].priority > s.blocks[j/blockSize][j%blockSize].priority
} }
// Swapts two elements in the stack. Required by sort.Interface. // Swaps two elements in the stack. Required by sort.Interface.
func (s *sstack) Swap(i, j int) { func (s *sstack) Swap(i, j int) {
ib, io, jb, jo := i/blockSize, i%blockSize, j/blockSize, j%blockSize ib, io, jb, jo := i/blockSize, i%blockSize, j/blockSize, j%blockSize
s.blocks[ib][io], s.blocks[jb][jo] = s.blocks[jb][jo], s.blocks[ib][io] s.blocks[ib][io], s.blocks[jb][jo] = s.blocks[jb][jo], s.blocks[ib][io]
@ -96,8 +96,5 @@ func (s *sstack) Swap(i, j int) {
// Resets the stack, effectively clearing its contents. // Resets the stack, effectively clearing its contents.
func (s *sstack) Reset() { func (s *sstack) Reset() {
s.size = 0 *s = *newSstack()
s.offset = 0
s.active = s.blocks[0]
s.capacity = blockSize
} }

View File

@ -79,15 +79,31 @@ func TestSstackSort(t *testing.T) {
} }
func TestSstackReset(t *testing.T) { func TestSstackReset(t *testing.T) {
// Push some stuff onto the stack // Create some initial data
size := 16 * blockSize size := 16 * blockSize
stack := newSstack() data := make([]*item, size)
for i := 0; i < size; i++ { for i := 0; i < size; i++ {
stack.Push(&item{i, float32(i)}) data[i] = &item{rand.Int(), rand.Float32()}
} }
// Clear and verify stack := newSstack()
stack.Reset() for rep := 0; rep < 2; rep++ {
if stack.Len() != 0 { // Push all the data into the stack, pop out every second
t.Errorf("stack not empty after reset: %v", stack) secs := []*item{}
for i := 0; i < size; i++ {
stack.Push(data[i])
if i%2 == 0 {
secs = append(secs, stack.Pop().(*item))
}
}
// Reset and verify both pulled and stack contents
stack.Reset()
if stack.Len() != 0 {
t.Errorf("stack not empty after reset: %v", stack)
}
for i := 0; i < size; i++ {
if i%2 == 0 && data[i] != secs[i/2] {
t.Errorf("push/pop mismatch: have %v, want %v.", secs[i/2], data[i])
}
}
} }
} }

View File

@ -181,3 +181,51 @@ func TestTaking(t *testing.T) {
t.Error("expected to take 1000, got", len(bs1)) t.Error("expected to take 1000, got", len(bs1))
} }
} }
func TestThrottling(t *testing.T) {
minDesiredPeerCount = 4
blockTtl = 1 * time.Second
targetBlocks := 4 * blockCacheLimit
hashes := createHashes(0, targetBlocks)
blocks := createBlocksFromHashes(hashes)
tester := newTester(t, hashes, blocks)
tester.newPeer("peer1", big.NewInt(10000), hashes[0])
tester.newPeer("peer2", big.NewInt(0), common.Hash{})
tester.badBlocksPeer("peer3", big.NewInt(0), common.Hash{})
tester.badBlocksPeer("peer4", big.NewInt(0), common.Hash{})
// Concurrently download and take the blocks
errc := make(chan error, 1)
go func() {
errc <- tester.sync("peer1", hashes[0])
}()
done := make(chan struct{})
took := []*types.Block{}
go func() {
for {
select {
case <-done:
took = append(took, tester.downloader.TakeBlocks()...)
done <- struct{}{}
return
default:
took = append(took, tester.downloader.TakeBlocks()...)
}
}
}()
// Synchronize the two threads and verify
err := <-errc
done <- struct{}{}
<-done
if err != nil {
t.Fatalf("failed to synchronize blocks: %v", err)
}
if len(took) != targetBlocks {
t.Fatalf("downloaded block mismatch: have %v, want %v", len(took), targetBlocks)
}
}