lite: Add synchronization in lite verify (#2396)
* Implement issues 2386: add synchronization in lite verify and change all Certify to Verify * Replace make(chan struct{}, 0) with make(chan struct{}) * Parameterize memroy cache size and add concurrent test * Refactor import order
This commit is contained in:
parent
5173fe9414
commit
8dda3c3b28
|
@ -30,6 +30,7 @@ var (
|
|||
nodeAddr string
|
||||
chainID string
|
||||
home string
|
||||
cacheSize int
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
@ -37,6 +38,7 @@ func init() {
|
|||
LiteCmd.Flags().StringVar(&nodeAddr, "node", "tcp://localhost:26657", "Connect to a Tendermint node at this address")
|
||||
LiteCmd.Flags().StringVar(&chainID, "chain-id", "tendermint", "Specify the Tendermint chain ID")
|
||||
LiteCmd.Flags().StringVar(&home, "home-dir", ".tendermint-lite", "Specify the home directory")
|
||||
LiteCmd.Flags().IntVar(&cacheSize, "cache-size", 10, "Specify the memory trust store cache size")
|
||||
}
|
||||
|
||||
func ensureAddrHasSchemeOrDefaultToTCP(addr string) (string, error) {
|
||||
|
@ -69,7 +71,7 @@ func runProxy(cmd *cobra.Command, args []string) error {
|
|||
node := rpcclient.NewHTTP(nodeAddr, "/websocket")
|
||||
|
||||
logger.Info("Constructing Verifier...")
|
||||
cert, err := proxy.NewVerifier(chainID, home, node, logger)
|
||||
cert, err := proxy.NewVerifier(chainID, home, node, logger, cacheSize)
|
||||
if err != nil {
|
||||
return cmn.ErrorWrap(err, "constructing Verifier")
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@ var _ Verifier = (*BaseVerifier)(nil)
|
|||
|
||||
// BaseVerifier lets us check the validity of SignedHeaders at height or
|
||||
// later, requiring sufficient votes (> 2/3) from the given valset.
|
||||
// To certify blocks produced by a blockchain with mutable validator sets,
|
||||
// To verify blocks produced by a blockchain with mutable validator sets,
|
||||
// use the DynamicVerifier.
|
||||
// TODO: Handle unbonding time.
|
||||
type BaseVerifier struct {
|
||||
|
@ -40,15 +40,15 @@ func (bc *BaseVerifier) ChainID() string {
|
|||
}
|
||||
|
||||
// Implements Verifier.
|
||||
func (bc *BaseVerifier) Certify(signedHeader types.SignedHeader) error {
|
||||
func (bc *BaseVerifier) Verify(signedHeader types.SignedHeader) error {
|
||||
|
||||
// We can't certify commits older than bc.height.
|
||||
// We can't verify commits older than bc.height.
|
||||
if signedHeader.Height < bc.height {
|
||||
return cmn.NewError("BaseVerifier height is %v, cannot certify height %v",
|
||||
return cmn.NewError("BaseVerifier height is %v, cannot verify height %v",
|
||||
bc.height, signedHeader.Height)
|
||||
}
|
||||
|
||||
// We can't certify with the wrong validator set.
|
||||
// We can't verify with the wrong validator set.
|
||||
if !bytes.Equal(signedHeader.ValidatorsHash,
|
||||
bc.valset.Hash()) {
|
||||
return lerr.ErrUnexpectedValidators(signedHeader.ValidatorsHash, bc.valset.Hash())
|
||||
|
@ -57,7 +57,7 @@ func (bc *BaseVerifier) Certify(signedHeader types.SignedHeader) error {
|
|||
// Do basic sanity checks.
|
||||
err := signedHeader.ValidateBasic(bc.chainID)
|
||||
if err != nil {
|
||||
return cmn.ErrorWrap(err, "in certify")
|
||||
return cmn.ErrorWrap(err, "in verify")
|
||||
}
|
||||
|
||||
// Check commit signatures.
|
||||
|
@ -65,7 +65,7 @@ func (bc *BaseVerifier) Certify(signedHeader types.SignedHeader) error {
|
|||
bc.chainID, signedHeader.Commit.BlockID,
|
||||
signedHeader.Height, signedHeader.Commit)
|
||||
if err != nil {
|
||||
return cmn.ErrorWrap(err, "in certify")
|
||||
return cmn.ErrorWrap(err, "in verify")
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -43,7 +43,7 @@ func TestBaseCert(t *testing.T) {
|
|||
for _, tc := range cases {
|
||||
sh := tc.keys.GenSignedHeader(chainID, tc.height, nil, tc.vals, tc.vals,
|
||||
[]byte("foo"), []byte("params"), []byte("results"), tc.first, tc.last)
|
||||
err := cert.Certify(sh)
|
||||
err := cert.Verify(sh)
|
||||
if tc.proper {
|
||||
assert.Nil(err, "%+v", err)
|
||||
} else {
|
||||
|
|
|
@ -54,11 +54,11 @@ validator set, and that the height of the commit is at least height (or
|
|||
greater).
|
||||
|
||||
SignedHeader.Commit may be signed by a different validator set, it can get
|
||||
certified with a BaseVerifier as long as sufficient signatures from the
|
||||
verified with a BaseVerifier as long as sufficient signatures from the
|
||||
previous validator set are present in the commit.
|
||||
|
||||
DynamicVerifier - this Verifier implements an auto-update and persistence
|
||||
strategy to certify any SignedHeader of the blockchain.
|
||||
strategy to verify any SignedHeader of the blockchain.
|
||||
|
||||
## Provider and PersistentProvider
|
||||
|
||||
|
|
|
@ -2,12 +2,15 @@ package lite
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
"fmt"
|
||||
"sync"
|
||||
log "github.com/tendermint/tendermint/libs/log"
|
||||
lerr "github.com/tendermint/tendermint/lite/errors"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
const sizeOfPendingMap = 1024
|
||||
|
||||
var _ Verifier = (*DynamicVerifier)(nil)
|
||||
|
||||
// DynamicVerifier implements an auto-updating Verifier. It uses a
|
||||
|
@ -21,6 +24,11 @@ type DynamicVerifier struct {
|
|||
trusted PersistentProvider
|
||||
// This is a source of new info, like a node rpc, or other import method.
|
||||
source Provider
|
||||
|
||||
// pending map for synchronize concurrent verification requests
|
||||
pendingVerifications map[int64]chan struct{}
|
||||
|
||||
mtx sync.Mutex
|
||||
}
|
||||
|
||||
// NewDynamicVerifier returns a new DynamicVerifier. It uses the
|
||||
|
@ -31,10 +39,11 @@ type DynamicVerifier struct {
|
|||
// files.Provider. The source provider should be a client.HTTPProvider.
|
||||
func NewDynamicVerifier(chainID string, trusted PersistentProvider, source Provider) *DynamicVerifier {
|
||||
return &DynamicVerifier{
|
||||
logger: log.NewNopLogger(),
|
||||
chainID: chainID,
|
||||
trusted: trusted,
|
||||
source: source,
|
||||
logger: log.NewNopLogger(),
|
||||
chainID: chainID,
|
||||
trusted: trusted,
|
||||
source: source,
|
||||
pendingVerifications: make(map[int64]chan struct{}, sizeOfPendingMap),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -56,7 +65,40 @@ func (ic *DynamicVerifier) ChainID() string {
|
|||
// ic.trusted and ic.source to prove the new validators. On success, it will
|
||||
// try to store the SignedHeader in ic.trusted if the next
|
||||
// validator can be sourced.
|
||||
func (ic *DynamicVerifier) Certify(shdr types.SignedHeader) error {
|
||||
func (ic *DynamicVerifier) Verify(shdr types.SignedHeader) error {
|
||||
|
||||
// Performs synchronization for multi-threads verification at the same height.
|
||||
ic.mtx.Lock()
|
||||
if pending := ic.pendingVerifications[shdr.Height]; pending != nil {
|
||||
ic.mtx.Unlock()
|
||||
<-pending // pending is chan struct{}
|
||||
} else {
|
||||
pending := make(chan struct{})
|
||||
ic.pendingVerifications[shdr.Height] = pending
|
||||
defer func() {
|
||||
close(pending)
|
||||
ic.mtx.Lock()
|
||||
delete(ic.pendingVerifications, shdr.Height)
|
||||
ic.mtx.Unlock()
|
||||
}()
|
||||
ic.mtx.Unlock()
|
||||
}
|
||||
//Get the exact trusted commit for h, and if it is
|
||||
// equal to shdr, then don't even verify it,
|
||||
// and just return nil.
|
||||
trustedFCSameHeight, err := ic.trusted.LatestFullCommit(ic.chainID, shdr.Height, shdr.Height)
|
||||
if err == nil {
|
||||
// If loading trust commit successfully, and trust commit equal to shdr, then don't verify it,
|
||||
// just return nil.
|
||||
if bytes.Equal(trustedFCSameHeight.SignedHeader.Hash(), shdr.Hash()) {
|
||||
ic.logger.Info(fmt.Sprintf("Load full commit at height %d from cache, there is not need to verify.", shdr.Height))
|
||||
return nil
|
||||
}
|
||||
} else if !lerr.IsErrCommitNotFound(err) {
|
||||
// Return error if it is not CommitNotFound error
|
||||
ic.logger.Info(fmt.Sprintf("Encountered unknown error in loading full commit at height %d.", shdr.Height))
|
||||
return err
|
||||
}
|
||||
|
||||
// Get the latest known full commit <= h-1 from our trusted providers.
|
||||
// The full commit at h-1 contains the valset to sign for h.
|
||||
|
@ -94,9 +136,9 @@ func (ic *DynamicVerifier) Certify(shdr types.SignedHeader) error {
|
|||
}
|
||||
}
|
||||
|
||||
// Certify the signed header using the matching valset.
|
||||
// Verify the signed header using the matching valset.
|
||||
cert := NewBaseVerifier(ic.chainID, trustedFC.Height()+1, trustedFC.NextValidators)
|
||||
err = cert.Certify(shdr)
|
||||
err = cert.Verify(shdr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -2,8 +2,8 @@ package lite
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
|
@ -49,7 +49,7 @@ func TestInquirerValidPath(t *testing.T) {
|
|||
|
||||
// This should fail validation:
|
||||
sh := fcz[count-1].SignedHeader
|
||||
err = cert.Certify(sh)
|
||||
err = cert.Verify(sh)
|
||||
require.NotNil(err)
|
||||
|
||||
// Adding a few commits in the middle should be insufficient.
|
||||
|
@ -57,7 +57,7 @@ func TestInquirerValidPath(t *testing.T) {
|
|||
err := source.SaveFullCommit(fcz[i])
|
||||
require.Nil(err)
|
||||
}
|
||||
err = cert.Certify(sh)
|
||||
err = cert.Verify(sh)
|
||||
assert.NotNil(err)
|
||||
|
||||
// With more info, we succeed.
|
||||
|
@ -65,7 +65,7 @@ func TestInquirerValidPath(t *testing.T) {
|
|||
err := source.SaveFullCommit(fcz[i])
|
||||
require.Nil(err)
|
||||
}
|
||||
err = cert.Certify(sh)
|
||||
err = cert.Verify(sh)
|
||||
assert.Nil(err, "%+v", err)
|
||||
}
|
||||
|
||||
|
@ -115,18 +115,18 @@ func TestInquirerVerifyHistorical(t *testing.T) {
|
|||
err = source.SaveFullCommit(fcz[7])
|
||||
require.Nil(err, "%+v", err)
|
||||
sh := fcz[8].SignedHeader
|
||||
err = cert.Certify(sh)
|
||||
err = cert.Verify(sh)
|
||||
require.Nil(err, "%+v", err)
|
||||
assert.Equal(fcz[7].Height(), cert.LastTrustedHeight())
|
||||
fc_, err := trust.LatestFullCommit(chainID, fcz[8].Height(), fcz[8].Height())
|
||||
require.NotNil(err, "%+v", err)
|
||||
assert.Equal(fc_, (FullCommit{}))
|
||||
|
||||
// With fcz[9] Certify will update last trusted height.
|
||||
// With fcz[9] Verify will update last trusted height.
|
||||
err = source.SaveFullCommit(fcz[9])
|
||||
require.Nil(err, "%+v", err)
|
||||
sh = fcz[8].SignedHeader
|
||||
err = cert.Certify(sh)
|
||||
err = cert.Verify(sh)
|
||||
require.Nil(err, "%+v", err)
|
||||
assert.Equal(fcz[8].Height(), cert.LastTrustedHeight())
|
||||
fc_, err = trust.LatestFullCommit(chainID, fcz[8].Height(), fcz[8].Height())
|
||||
|
@ -141,13 +141,70 @@ func TestInquirerVerifyHistorical(t *testing.T) {
|
|||
|
||||
// Try to check an unknown seed in the past.
|
||||
sh = fcz[3].SignedHeader
|
||||
err = cert.Certify(sh)
|
||||
err = cert.Verify(sh)
|
||||
require.Nil(err, "%+v", err)
|
||||
assert.Equal(fcz[8].Height(), cert.LastTrustedHeight())
|
||||
|
||||
// Jump all the way forward again.
|
||||
sh = fcz[count-1].SignedHeader
|
||||
err = cert.Certify(sh)
|
||||
err = cert.Verify(sh)
|
||||
require.Nil(err, "%+v", err)
|
||||
assert.Equal(fcz[9].Height(), cert.LastTrustedHeight())
|
||||
}
|
||||
|
||||
func TestConcurrencyInquirerVerify(t *testing.T) {
|
||||
_, require := assert.New(t), require.New(t)
|
||||
trust := NewDBProvider("trust", dbm.NewMemDB()).SetLimit(10)
|
||||
source := NewDBProvider("source", dbm.NewMemDB())
|
||||
|
||||
// Set up the validators to generate test blocks.
|
||||
var vote int64 = 10
|
||||
keys := genPrivKeys(5)
|
||||
nkeys := keys.Extend(1)
|
||||
|
||||
// Construct a bunch of commits, each with one more height than the last.
|
||||
chainID := "inquiry-test"
|
||||
count := 10
|
||||
consHash := []byte("special-params")
|
||||
fcz := make([]FullCommit, count)
|
||||
for i := 0; i < count; i++ {
|
||||
vals := keys.ToValidators(vote, 0)
|
||||
nextVals := nkeys.ToValidators(vote, 0)
|
||||
h := int64(1 + i)
|
||||
appHash := []byte(fmt.Sprintf("h=%d", h))
|
||||
resHash := []byte(fmt.Sprintf("res=%d", h))
|
||||
fcz[i] = keys.GenFullCommit(
|
||||
chainID, h, nil,
|
||||
vals, nextVals,
|
||||
appHash, consHash, resHash, 0, len(keys))
|
||||
// Extend the keys by 1 each time.
|
||||
keys = nkeys
|
||||
nkeys = nkeys.Extend(1)
|
||||
}
|
||||
|
||||
// Initialize a Verifier with the initial state.
|
||||
err := trust.SaveFullCommit(fcz[0])
|
||||
require.Nil(err)
|
||||
cert := NewDynamicVerifier(chainID, trust, source)
|
||||
cert.SetLogger(log.TestingLogger())
|
||||
|
||||
err = source.SaveFullCommit(fcz[7])
|
||||
err = source.SaveFullCommit(fcz[8])
|
||||
require.Nil(err, "%+v", err)
|
||||
sh := fcz[8].SignedHeader
|
||||
|
||||
var wg sync.WaitGroup
|
||||
count = 100
|
||||
errList := make([]error, count)
|
||||
for i := 0; i < count; i++ {
|
||||
wg.Add(1)
|
||||
go func(index int) {
|
||||
errList[index] = cert.Verify(sh)
|
||||
defer wg.Done()
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
for _, err := range errList {
|
||||
require.Nil(err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -146,7 +146,7 @@ func GetCertifiedCommit(h int64, client rpcclient.Client, cert lite.Verifier) (t
|
|||
h, sh.Height)
|
||||
}
|
||||
|
||||
if err = cert.Certify(sh); err != nil {
|
||||
if err = cert.Verify(sh); err != nil {
|
||||
return types.SignedHeader{}, err
|
||||
}
|
||||
|
||||
|
|
|
@ -8,12 +8,12 @@ import (
|
|||
lclient "github.com/tendermint/tendermint/lite/client"
|
||||
)
|
||||
|
||||
func NewVerifier(chainID, rootDir string, client lclient.SignStatusClient, logger log.Logger) (*lite.DynamicVerifier, error) {
|
||||
func NewVerifier(chainID, rootDir string, client lclient.SignStatusClient, logger log.Logger, cacheSize int) (*lite.DynamicVerifier, error) {
|
||||
|
||||
logger = logger.With("module", "lite/proxy")
|
||||
logger.Info("lite/proxy/NewVerifier()...", "chainID", chainID, "rootDir", rootDir, "client", client)
|
||||
|
||||
memProvider := lite.NewDBProvider("trusted.mem", dbm.NewMemDB()).SetLimit(10)
|
||||
memProvider := lite.NewDBProvider("trusted.mem", dbm.NewMemDB()).SetLimit(cacheSize)
|
||||
lvlProvider := lite.NewDBProvider("trusted.lvl", dbm.NewDB("trust-base", dbm.LevelDBBackend, rootDir))
|
||||
trust := lite.NewMultiProvider(
|
||||
memProvider,
|
||||
|
|
|
@ -134,10 +134,10 @@ func (w Wrapper) Commit(height *int64) (*ctypes.ResultCommit, error) {
|
|||
}
|
||||
rpcclient.WaitForHeight(w.Client, *height, nil)
|
||||
res, err := w.Client.Commit(height)
|
||||
// if we got it, then certify it
|
||||
// if we got it, then verify it
|
||||
if err == nil {
|
||||
sh := res.SignedHeader
|
||||
err = w.cert.Certify(sh)
|
||||
err = w.cert.Verify(sh)
|
||||
}
|
||||
return res, err
|
||||
}
|
||||
|
|
|
@ -8,6 +8,6 @@ import (
|
|||
// Verifier must know the current or recent set of validitors by some other
|
||||
// means.
|
||||
type Verifier interface {
|
||||
Certify(sheader types.SignedHeader) error
|
||||
Verify(sheader types.SignedHeader) error
|
||||
ChainID() string
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue