diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 3318879e2..5be09f37d 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -23,7 +23,6 @@ import ( "errors" "fmt" "sync" - "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -101,10 +100,9 @@ type queue struct { stateTaskQueue *prque.Prque // [eth/63] Priority queue of the hashes to fetch the node data for statePendPool map[string]*fetchRequest // [eth/63] Currently pending node data retrieval operations - stateDatabase ethdb.Database // [eth/63] Trie database to populate during state reassembly - stateScheduler *state.StateSync // [eth/63] State trie synchronisation scheduler and integrator - stateProcessors int32 // [eth/63] Number of currently running state processors - stateSchedLock sync.RWMutex // [eth/63] Lock serialising access to the state scheduler + stateDatabase ethdb.Database // [eth/63] Trie database to populate during state reassembly + stateScheduler *state.StateSync // [eth/63] State trie synchronisation scheduler and integrator + stateWriters int // [eth/63] Number of running state DB writer goroutines resultCache []*fetchResult // Downloaded but not yet delivered fetch results resultOffset uint64 // Offset of the first cached fetch result in the block chain @@ -143,9 +141,6 @@ func (q *queue) Reset() { q.lock.Lock() defer q.lock.Unlock() - q.stateSchedLock.Lock() - defer q.stateSchedLock.Unlock() - q.closed = false q.mode = FullSync q.fastSyncPivot = 0 @@ -209,13 +204,24 @@ func (q *queue) PendingReceipts() int { // PendingNodeData retrieves the number of node data entries pending for retrieval. func (q *queue) PendingNodeData() int { - q.stateSchedLock.RLock() - defer q.stateSchedLock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() + return q.pendingNodeDataLocked() +} + +// pendingNodeDataLocked retrieves the number of node data entries pending for retrieval. +// The caller must hold q.lock. +func (q *queue) pendingNodeDataLocked() int { + var n int if q.stateScheduler != nil { - return q.stateScheduler.Pending() + n = q.stateScheduler.Pending() } - return 0 + // Ensure that PendingNodeData doesn't return 0 until all state is written. + if q.stateWriters > 0 { + n++ + } + return n } // InFlightHeaders retrieves whether there are header fetch requests currently @@ -251,7 +257,7 @@ func (q *queue) InFlightNodeData() bool { q.lock.Lock() defer q.lock.Unlock() - return len(q.statePendPool)+int(atomic.LoadInt32(&q.stateProcessors)) > 0 + return len(q.statePendPool)+q.stateWriters > 0 } // Idle returns if the queue is fully idle or has some data still inside. This @@ -264,12 +270,9 @@ func (q *queue) Idle() bool { pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool) cached := len(q.blockDonePool) + len(q.receiptDonePool) - q.stateSchedLock.RLock() if q.stateScheduler != nil { queued += q.stateScheduler.Pending() } - q.stateSchedLock.RUnlock() - return (queued + pending + cached) == 0 } @@ -398,9 +401,7 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { req.Hashes = make(map[common.Hash]int) // Make sure executing requests fail, but don't disappear } - q.stateSchedLock.Lock() q.stateScheduler = state.NewStateSync(header.Root, q.stateDatabase) - q.stateSchedLock.Unlock() } inserts = append(inserts, header) q.headerHead = hash @@ -459,7 +460,7 @@ func (q *queue) countProcessableItems() int { // resultCache has space for fsHeaderForceVerify items. Not // doing this could leave us unable to download the required // amount of headers. - if i > 0 || len(q.stateTaskPool) > 0 || q.PendingNodeData() > 0 { + if i > 0 || len(q.stateTaskPool) > 0 || q.pendingNodeDataLocked() > 0 { return i } for j := 0; j < fsHeaderForceVerify; j++ { @@ -524,9 +525,6 @@ func (q *queue) ReserveHeaders(p *peer, count int) *fetchRequest { func (q *queue) ReserveNodeData(p *peer, count int) *fetchRequest { // Create a task generator to fetch status-fetch tasks if all schedules ones are done generator := func(max int) { - q.stateSchedLock.Lock() - defer q.stateSchedLock.Unlock() - if q.stateScheduler != nil { for _, hash := range q.stateScheduler.Missing(max) { q.stateTaskPool[hash] = q.stateTaskIndex @@ -1068,7 +1066,7 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(int, boo } } // Iterate over the downloaded data and verify each of them - accepted, errs := 0, make([]error, 0) + errs := make([]error, 0) process := []trie.SyncResult{} for _, blob := range data { // Skip any state trie entries that were not requested @@ -1079,70 +1077,52 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(int, boo } // Inject the next state trie item into the processing queue process = append(process, trie.SyncResult{Hash: hash, Data: blob}) - accepted++ - delete(request.Hashes, hash) delete(q.stateTaskPool, hash) } - // Start the asynchronous node state data injection - atomic.AddInt32(&q.stateProcessors, 1) - go func() { - defer atomic.AddInt32(&q.stateProcessors, -1) - q.deliverNodeData(process, callback) - }() // Return all failed or missing fetches to the queue for hash, index := range request.Hashes { q.stateTaskQueue.Push(hash, float32(index)) } + if q.stateScheduler == nil { + return 0, errNoFetchesPending + } + + // Run valid nodes through the trie download scheduler. It writes completed nodes to a + // batch, which is committed asynchronously. This may lead to over-fetches because the + // scheduler treats everything as written after Process has returned, but it's + // unlikely to be an issue in practice. + batch := q.stateDatabase.NewBatch() + progressed, nproc, procerr := q.stateScheduler.Process(process, batch) + q.stateWriters += 1 + go func() { + if procerr == nil { + nproc = len(process) + procerr = batch.Write() + } + // Return processing errors through the callback so the sync gets canceled. The + // number of writers is decremented prior to the call so PendingNodeData will + // return zero when the callback runs. + q.lock.Lock() + q.stateWriters -= 1 + q.lock.Unlock() + callback(nproc, progressed, procerr) + // Wake up WaitResults after the state has been written because it might be + // waiting for completion of the pivot block's state download. + q.active.Signal() + }() + // If none of the data items were good, it's a stale delivery switch { case len(errs) == 0: - return accepted, nil + return len(process), nil case len(errs) == len(request.Hashes): - return accepted, errStaleDelivery + return len(process), errStaleDelivery default: - return accepted, fmt.Errorf("multiple failures: %v", errs) + return len(process), fmt.Errorf("multiple failures: %v", errs) } } -// deliverNodeData is the asynchronous node data processor that injects a batch -// of sync results into the state scheduler. -func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(int, bool, error)) { - // Wake up WaitResults after the state has been written because it - // might be waiting for the pivot block state to get completed. - defer q.active.Signal() - - // Process results one by one to permit task fetches in between - progressed := false - for i, result := range results { - q.stateSchedLock.Lock() - - if q.stateScheduler == nil { - // Syncing aborted since this async delivery started, bail out - q.stateSchedLock.Unlock() - callback(i, progressed, errNoFetchesPending) - return - } - - batch := q.stateDatabase.NewBatch() - prog, _, err := q.stateScheduler.Process([]trie.SyncResult{result}, batch) - if err != nil { - q.stateSchedLock.Unlock() - callback(i, progressed, err) - return - } - if err = batch.Write(); err != nil { - q.stateSchedLock.Unlock() - callback(i, progressed, err) - return // TODO(karalabe): If a DB write fails (disk full), we ought to cancel the sync - } - // Item processing succeeded, release the lock (temporarily) - progressed = progressed || prog - q.stateSchedLock.Unlock() - } - callback(len(results), progressed, nil) -} - // Prepare configures the result cache to allow accepting and caching inbound // fetch results. func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types.Header) {