From 0f7fbb85d6e939510a3e3bb6493a9a332ddfd8e8 Mon Sep 17 00:00:00 2001 From: gary rong Date: Mon, 27 Nov 2017 19:34:17 +0800 Subject: [PATCH] trie: make fullnode children hash calculation concurrently (#15131) * trie: make fullnode children hash calculation concurrently * trie: thread out only on topmost fullnode * trie: clean up full node children hash calculation * trie: minor code fixups --- trie/hasher.go | 111 +++++++++++++++++++++++++++++++++----------- trie/secure_trie.go | 8 ++-- trie/trie.go | 1 - 3 files changed, 88 insertions(+), 32 deletions(-) diff --git a/trie/hasher.go b/trie/hasher.go index 4719aabf6..5186d7669 100644 --- a/trie/hasher.go +++ b/trie/hasher.go @@ -26,27 +26,46 @@ import ( "github.com/ethereum/go-ethereum/rlp" ) -type hasher struct { - tmp *bytes.Buffer - sha hash.Hash - cachegen, cachelimit uint16 +// calculator is a utility used by the hasher to calculate the hash value of the tree node. +type calculator struct { + sha hash.Hash + buffer *bytes.Buffer } -// hashers live in a global pool. -var hasherPool = sync.Pool{ +// calculatorPool is a set of temporary calculators that may be individually saved and retrieved. +var calculatorPool = sync.Pool{ New: func() interface{} { - return &hasher{tmp: new(bytes.Buffer), sha: sha3.NewKeccak256()} + return &calculator{buffer: new(bytes.Buffer), sha: sha3.NewKeccak256()} }, } +// hasher hasher is used to calculate the hash value of the whole tree. +type hasher struct { + cachegen uint16 + cachelimit uint16 + threaded bool + mu sync.Mutex +} + func newHasher(cachegen, cachelimit uint16) *hasher { - h := hasherPool.Get().(*hasher) - h.cachegen, h.cachelimit = cachegen, cachelimit + h := &hasher{ + cachegen: cachegen, + cachelimit: cachelimit, + } return h } -func returnHasherToPool(h *hasher) { - hasherPool.Put(h) +// newCalculator retrieves a cleaned calculator from calculator pool. +func (h *hasher) newCalculator() *calculator { + calculator := calculatorPool.Get().(*calculator) + calculator.buffer.Reset() + calculator.sha.Reset() + return calculator +} + +// returnCalculator returns a no longer used calculator to the pool. +func (h *hasher) returnCalculator(calculator *calculator) { + calculatorPool.Put(calculator) } // hash collapses a node down into a hash node, also returning a copy of the @@ -123,15 +142,48 @@ func (h *hasher) hashChildren(original node, db DatabaseWriter) (node, node, err // Hash the full node's children, caching the newly hashed subtrees collapsed, cached := n.copy(), n.copy() - for i := 0; i < 16; i++ { - if n.Children[i] != nil { - collapsed.Children[i], cached.Children[i], err = h.hash(n.Children[i], db, false) - if err != nil { - return original, original, err - } - } else { - collapsed.Children[i] = valueNode(nil) // Ensure that nil children are encoded as empty strings. + // hashChild is a helper to hash a single child, which is called either on the + // same thread as the caller or in a goroutine for the toplevel branching. + hashChild := func(index int, wg *sync.WaitGroup) { + if wg != nil { + defer wg.Done() } + // Ensure that nil children are encoded as empty strings. + if collapsed.Children[index] == nil { + collapsed.Children[index] = valueNode(nil) + return + } + // Hash all other children properly + var herr error + collapsed.Children[index], cached.Children[index], herr = h.hash(n.Children[index], db, false) + if herr != nil { + h.mu.Lock() // rarely if ever locked, no congenstion + err = herr + h.mu.Unlock() + } + } + // If we're not running in threaded mode yet, span a goroutine for each child + if !h.threaded { + // Disable further threading + h.threaded = true + + // Hash all the children concurrently + var wg sync.WaitGroup + for i := 0; i < 16; i++ { + wg.Add(1) + go hashChild(i, &wg) + } + wg.Wait() + + // Reenable threading for subsequent hash calls + h.threaded = false + } else { + for i := 0; i < 16; i++ { + hashChild(i, nil) + } + } + if err != nil { + return original, original, err } cached.Children[16] = n.Children[16] if collapsed.Children[16] == nil { @@ -150,24 +202,29 @@ func (h *hasher) store(n node, db DatabaseWriter, force bool) (node, error) { if _, isHash := n.(hashNode); n == nil || isHash { return n, nil } + calculator := h.newCalculator() + defer h.returnCalculator(calculator) + // Generate the RLP encoding of the node - h.tmp.Reset() - if err := rlp.Encode(h.tmp, n); err != nil { + if err := rlp.Encode(calculator.buffer, n); err != nil { panic("encode error: " + err.Error()) } - - if h.tmp.Len() < 32 && !force { + if calculator.buffer.Len() < 32 && !force { return n, nil // Nodes smaller than 32 bytes are stored inside their parent } // Larger nodes are replaced by their hash and stored in the database. hash, _ := n.cache() if hash == nil { - h.sha.Reset() - h.sha.Write(h.tmp.Bytes()) - hash = hashNode(h.sha.Sum(nil)) + calculator.sha.Write(calculator.buffer.Bytes()) + hash = hashNode(calculator.sha.Sum(nil)) } if db != nil { - return hash, db.Put(hash, h.tmp.Bytes()) + // db might be a leveldb batch, which is not safe for concurrent writes + h.mu.Lock() + err := db.Put(hash, calculator.buffer.Bytes()) + h.mu.Unlock() + + return hash, err } return hash, nil } diff --git a/trie/secure_trie.go b/trie/secure_trie.go index 20c303f31..1fde45165 100644 --- a/trie/secure_trie.go +++ b/trie/secure_trie.go @@ -199,10 +199,10 @@ func (t *SecureTrie) secKey(key []byte) []byte { // invalid on the next call to hashKey or secKey. func (t *SecureTrie) hashKey(key []byte) []byte { h := newHasher(0, 0) - h.sha.Reset() - h.sha.Write(key) - buf := h.sha.Sum(t.hashKeyBuf[:0]) - returnHasherToPool(h) + calculator := h.newCalculator() + calculator.sha.Write(key) + buf := calculator.sha.Sum(t.hashKeyBuf[:0]) + h.returnCalculator(calculator) return buf } diff --git a/trie/trie.go b/trie/trie.go index 7f69a3d1d..c211e7554 100644 --- a/trie/trie.go +++ b/trie/trie.go @@ -501,6 +501,5 @@ func (t *Trie) hashRoot(db DatabaseWriter) (node, node, error) { return hashNode(emptyRoot.Bytes()), nil, nil } h := newHasher(t.cachegen, t.cachelimit) - defer returnHasherToPool(h) return h.hash(t.root, db, true) }