[#1169] Step 5 -Get the suggested scan ranges from the wallet database

- initial action in place
- verify loop WIP

[#1169] Step 5 -Get the suggested scan ranges from the wallet database

- firstUnenhancedHeight fix

[#1169] Step 5 -Get the suggested scan ranges from the wallet database

- first working version implemented
- download, scan and enhance actions are dependent on the context values only
This commit is contained in:
Lukas Korba 2023-08-03 11:07:57 +02:00
parent 4a78b4bd16
commit 086827fa24
22 changed files with 181 additions and 49 deletions

View File

@ -12,6 +12,7 @@ actor ActionContext {
var prevState: CBPState?
var syncControlData: SyncControlData
var totalProgressRange: CompactBlockRange = 0...0
var lastScannedHeight: BlockHeight?
var lastDownloadedHeight: BlockHeight?
var lastEnhancedHeight: BlockHeight?
@ -26,6 +27,7 @@ actor ActionContext {
}
func update(syncControlData: SyncControlData) async { self.syncControlData = syncControlData }
func update(totalProgressRange: CompactBlockRange) async { self.totalProgressRange = totalProgressRange }
func update(lastScannedHeight: BlockHeight) async { self.lastScannedHeight = lastScannedHeight }
func update(lastDownloadedHeight: BlockHeight) async { self.lastDownloadedHeight = lastDownloadedHeight }
func update(lastEnhancedHeight: BlockHeight?) async { self.lastEnhancedHeight = lastEnhancedHeight }
}
@ -36,6 +38,7 @@ enum CBPState: CaseIterable {
case validateServer
case updateSubtreeRoots
case updateChainTip
case validatePreviousWalletSession
case computeSyncControlData
case download
case scan

View File

@ -21,7 +21,12 @@ extension ClearAlreadyScannedBlocksAction: Action {
var removeBlocksCacheWhenFailed: Bool { false }
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessor.Event) async -> Void) async throws -> ActionContext {
let lastScannedHeight = try await transactionRepository.lastScannedHeight()
guard let lastScannedHeight = await context.lastScannedHeight else {
fatalError("it must be valid")
return context
}
//let lastScannedHeight = //try await transactionRepository.lastScannedHeight()
try await storage.clear(upTo: lastScannedHeight)
await context.update(state: .enhance)

View File

@ -23,7 +23,8 @@ extension ClearCacheAction: Action {
if await context.prevState == .idle {
await context.update(state: .migrateLegacyCacheDB)
} else {
await context.update(state: .finished)
//await context.update(state: .finished) // Linear
await context.update(state: .validatePreviousWalletSession)
}
return context
}

View File

@ -64,6 +64,7 @@ extension ComputeSyncControlDataAction: Action {
firstUnenhancedHeight: enhanceStart
)
await context.update(lastScannedHeight: latestScannedHeight)
await context.update(lastDownloadedHeight: latestScannedHeight)
await context.update(syncControlData: syncControlData)
await context.update(totalProgressRange: latestScannedHeight...latestBlockHeight)
@ -72,7 +73,7 @@ extension ComputeSyncControlDataAction: Action {
if latestBlockHeight < latestScannedHeight || latestBlockHeight == latestScannedHeight {
await context.update(state: .finished)
} else {
await context.update(state: .fetchUTXO)
await context.update(state: .download)
}
return context

View File

@ -30,16 +30,16 @@ extension DownloadAction: Action {
var removeBlocksCacheWhenFailed: Bool { true }
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessor.Event) async -> Void) async throws -> ActionContext {
guard let lastScannedHeight = await context.syncControlData.latestScannedHeight else {
guard let lastScannedHeight = await context.lastScannedHeight else {
return await update(context: context)
}
let config = await configProvider.config
let lastScannedHeightDB = try await transactionRepository.lastScannedHeight()
// let lastScannedHeightDB = try await transactionRepository.lastScannedHeight()
let latestBlockHeight = await context.syncControlData.latestBlockHeight
// This action is executed for each batch (batch size is 100 blocks by default) until all the blocks in whole `downloadRange` are downloaded.
// So the right range for this batch must be computed.
let batchRangeStart = max(lastScannedHeightDB, lastScannedHeight)
let batchRangeStart = lastScannedHeight//max(lastScannedHeightDB, lastScannedHeight)
let batchRangeEnd = min(latestBlockHeight, batchRangeStart + config.batchSize)
guard batchRangeStart <= batchRangeEnd else {
@ -47,10 +47,11 @@ extension DownloadAction: Action {
}
let batchRange = batchRangeStart...batchRangeEnd
let downloadLimit = batchRange.upperBound + (2 * config.batchSize)
let potentialDownloadLimit = batchRange.upperBound + (2 * config.batchSize)
let downloadLimit = await context.syncControlData.latestBlockHeight >= potentialDownloadLimit ? potentialDownloadLimit : batchRangeEnd
logger.debug("Starting download with range: \(batchRange.lowerBound)...\(batchRange.upperBound)")
await downloader.update(latestDownloadedBlockHeight: batchRange.lowerBound)
await downloader.update(latestDownloadedBlockHeight: batchRange.lowerBound, force: true) // SbS
try await downloader.setSyncRange(lastScannedHeight...latestBlockHeight, batchSize: config.batchSize)
await downloader.setDownloadLimit(downloadLimit)
await downloader.startDownload(maxBlockBufferSize: config.downloadBufferSize)

View File

@ -22,15 +22,18 @@ final class EnhanceAction {
func decideWhatToDoNext(context: ActionContext, lastScannedHeight: BlockHeight) async -> ActionContext {
guard await context.syncControlData.latestScannedHeight != nil else {
await context.update(state: .clearCache)
await context.update(state: .clearCache) // linear
// await context.update(state: .validatePreviousWalletSession) // SbS
return context
}
let latestBlockHeight = await context.syncControlData.latestBlockHeight
if lastScannedHeight >= latestBlockHeight {
await context.update(state: .clearCache)
await context.update(state: .clearCache) // linear
// await context.update(state: .validatePreviousWalletSession) // SbS
} else {
await context.update(state: .download)
await context.update(state: .download) // Linear
// await context.update(state: .validatePreviousWalletSession) // SbS
}
return context
@ -49,7 +52,11 @@ extension EnhanceAction: Action {
// download and scan.
let config = await configProvider.config
let lastScannedHeight = try await transactionRepository.lastScannedHeight()
//let lastScannedHeight = try await transactionRepository.lastScannedHeight()
guard let lastScannedHeight = await context.lastScannedHeight else {
await context.update(state: .validatePreviousWalletSession)
return context
}
guard let firstUnenhancedHeight = await context.syncControlData.firstUnenhancedHeight else {
return await decideWhatToDoNext(context: context, lastScannedHeight: lastScannedHeight)

View File

@ -23,7 +23,8 @@ extension SaplingParamsAction: Action {
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessor.Event) async -> Void) async throws -> ActionContext {
logger.debug("Fetching sapling parameters")
try await saplingParametersHandler.handleIfNeeded()
await context.update(state: .download)
// await context.update(state: .computeSyncControlData) // Linear
await context.update(state: .updateSubtreeRoots) // SbS
return context
}

View File

@ -30,16 +30,16 @@ extension ScanAction: Action {
var removeBlocksCacheWhenFailed: Bool { true }
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessor.Event) async -> Void) async throws -> ActionContext {
guard let lastScannedHeight = await context.syncControlData.latestScannedHeight else {
guard let lastScannedHeight = await context.lastScannedHeight else {
return await update(context: context)
}
let config = await configProvider.config
let lastScannedHeightDB = try await transactionRepository.lastScannedHeight()
//let lastScannedHeightDB = try await transactionRepository.lastScannedHeight()
let latestBlockHeight = await context.syncControlData.latestBlockHeight
// This action is executed for each batch (batch size is 100 blocks by default) until all the blocks in whole `scanRange` are scanned.
// So the right range for this batch must be computed.
let batchRangeStart = max(lastScannedHeightDB, lastScannedHeight)
let batchRangeStart = lastScannedHeight//max(lastScannedHeightDB, lastScannedHeight)
let batchRangeEnd = min(latestBlockHeight, batchRangeStart + config.batchSize)
guard batchRangeStart <= batchRangeEnd else {
@ -58,6 +58,16 @@ extension ScanAction: Action {
)
self?.logger.debug("progress: \(progress)")
await didUpdate(.progressPartialUpdate(.syncing(progress)))
// ScanAction is controlled locally so it must report back the updated scanned height
await context.update(lastScannedHeight: lastScannedHeight)
// let prevSyncControlData = await context.syncControlData
// let newSyncControlData = SyncControlData(
// latestBlockHeight: prevSyncControlData.latestBlockHeight,
// latestScannedHeight: lastScannedHeight,
// firstUnenhancedHeight: prevSyncControlData.firstUnenhancedHeight
// )
// await context.update(syncControlData: newSyncControlData)
}
return await update(context: context)

View File

@ -28,9 +28,7 @@ extension UpdateChainTipAction: Action {
logger.info("Latest block height is \(latestBlockHeight)")
try await rustBackend.updateChainTip(height: Int32(latestBlockHeight))
// TODO: [#1169] Switching back to linear sync for now before step 5 & 6 are implemented
// https://github.com/zcash/ZcashLightClientKit/issues/1169
await context.update(state: .computeSyncControlData)
await context.update(state: .validatePreviousWalletSession)
return context
}

View File

@ -27,7 +27,7 @@ extension UpdateSubtreeRootsAction: Action {
request.shieldedProtocol = .sapling
request.maxEntries = 65536
logger.info("Attempt to get subtree roots, this may fail because lightwalletd may not support DAG sync.")
logger.info("Attempt to get subtree roots, this may fail because lightwalletd may not support Spend before Sync.")
let stream = service.getSubtreeRoots(request)
var roots: [SubtreeRoot] = []
@ -42,10 +42,10 @@ extension UpdateSubtreeRootsAction: Action {
err = error
}
// In case of error, the lightwalletd doesn't support DAG sync -> switching to linear sync.
// In case of error, the lightwalletd doesn't support Spend before Sync -> switching to linear sync.
// Likewise, no subtree roots results in switching to linear sync.
if err != nil || roots.isEmpty {
logger.info("DAG sync is not possible, switching to linear sync.")
logger.info("Spend before Sync is not possible, switching to linear sync.")
await context.update(state: .computeSyncControlData)
} else {
logger.info("Sapling tree has \(roots.count) subtrees")

View File

@ -0,0 +1,85 @@
//
// ValidatePreviousWalletSessionAction.swift
//
//
// Created by Lukáš Korba on 02.08.2023.
//
import Foundation
final class ValidatePreviousWalletSessionAction {
let rustBackend: ZcashRustBackendWelding
let service: LightWalletService
let logger: Logger
init(container: DIContainer) {
service = container.resolve(LightWalletService.self)
rustBackend = container.resolve(ZcashRustBackendWelding.self)
logger = container.resolve(Logger.self)
}
}
extension ValidatePreviousWalletSessionAction: Action {
var removeBlocksCacheWhenFailed: Bool { false }
func run(with context: ActionContext, didUpdate: @escaping (CompactBlockProcessor.Event) async -> Void) async throws -> ActionContext {
logger.info("Getting the suggested scan ranges from the wallet database.")
let scanRanges = try await rustBackend.suggestScanRanges()
print("__LD count \(scanRanges.count) first range \(scanRanges.first)")
// Run the following loop until the wallet's view of the chain tip
// as of the previous wallet session is valid.
// while true {
// If there is a range of blocks that needs to be verified, it will always
// be returned as the first element of the vector of suggested ranges.
if let firstRange = scanRanges.first {
//if firstRange.priority == .verify {
let lowerBound = firstRange.range.lowerBound - 1
let upperBound = firstRange.range.upperBound - 1
let syncControlData = SyncControlData(
latestBlockHeight: upperBound,
latestScannedHeight: lowerBound,
firstUnenhancedHeight: lowerBound + 1
)
logger.debug("""
Init numbers:
latestBlockHeight [BC]: \(upperBound)
latestScannedHeight [DB]: \(lowerBound)
firstUnenhancedHeight [DB]: \(lowerBound + 1)
""")
if scanRanges.count == 1 {
print("cool")
}
await context.update(lastScannedHeight: lowerBound)
await context.update(lastDownloadedHeight: lowerBound)
await context.update(syncControlData: syncControlData)
await context.update(totalProgressRange: lowerBound...upperBound)
await context.update(state: .download)
// } else {
// print("cool")
// }
} else {
await context.update(state: .finished)
}
// } else {
// // Nothing to verify; break out of the loop
// break
// }
// }
// TODO: [#1171] Switching back to linear sync for now before step 7 are implemented
// https://github.com/zcash/ZcashLightClientKit/issues/1171
// await context.update(state: .computeSyncControlData)
return context
}
func stop() async { }
}

View File

@ -52,7 +52,7 @@ extension ValidateServerAction: Action {
throw ZcashError.compactBlockProcessorWrongConsensusBranchId(localBranch, remoteBranchID)
}
await context.update(state: .updateSubtreeRoots)
await context.update(state: .fetchUTXO)
return context
}

View File

@ -218,6 +218,8 @@ actor CompactBlockProcessor {
action = UpdateSubtreeRootsAction(container: container)
case .updateChainTip:
action = UpdateChainTipAction(container: container)
case .validatePreviousWalletSession:
action = ValidatePreviousWalletSessionAction(container: container)
case .computeSyncControlData:
action = ComputeSyncControlDataAction(container: container, configProvider: configProvider)
case .download:
@ -593,6 +595,8 @@ extension CompactBlockProcessor {
break
case .updateChainTip:
break
case .validatePreviousWalletSession:
break
case .computeSyncControlData:
break
case .download:

View File

@ -55,7 +55,7 @@ protocol BlockDownloader {
/// Updates the internal in memory value of latest downloaded block height. This way the `BlockDownloader` works with the current latest height and can
/// continue on parallel downloading of next batch.
func update(latestDownloadedBlockHeight: BlockHeight) async
func update(latestDownloadedBlockHeight: BlockHeight, force: Bool) async
/// Provides the value of latest downloaded height.
func latestDownloadedBlockHeight() async -> BlockHeight
/// In case rewind is needed, the latestDownloadedBlockHeight is rewritten forcefully.
@ -253,8 +253,8 @@ extension BlockDownloaderImpl: BlockDownloader {
self.latestDownloadedBlockHeight = latestDownloadedBlockHeight ?? -1
}
func update(latestDownloadedBlockHeight: BlockHeight) async {
if latestDownloadedBlockHeight >= self.latestDownloadedBlockHeight {
func update(latestDownloadedBlockHeight: BlockHeight, force: Bool = false) async {
if latestDownloadedBlockHeight >= self.latestDownloadedBlockHeight || force {
self.latestDownloadedBlockHeight = latestDownloadedBlockHeight
}
}

View File

@ -40,7 +40,7 @@ extension BlockScannerImpl: BlockScanner {
logger.debug("Going to scan blocks in range: \(range)")
try Task.checkCancellation()
let scanStartHeight = try await transactionRepository.lastScannedHeight()
let scanStartHeight = range.lowerBound//try await transactionRepository.lastScannedHeight()
let targetScanHeight = range.upperBound
var scannedNewBlocks = false
@ -65,11 +65,13 @@ extension BlockScannerImpl: BlockScanner {
let scanFinishTime = Date()
if let lastScannedBlock = try await transactionRepository.lastScannedBlock() {
lastScannedHeight = lastScannedBlock.height
// if let lastScannedBlock = try await transactionRepository.lastScannedBlock() {
// lastScannedHeight = lastScannedBlock.height
lastScannedHeight = startHeight + Int(batchSize) - 1
await latestBlocksDataProvider.updateLatestScannedHeight(lastScannedHeight)
await latestBlocksDataProvider.updateLatestScannedTime(TimeInterval(lastScannedBlock.time))
}
// await latestBlocksDataProvider.updateLatestScannedTime(TimeInterval(lastScannedBlock.time))
// }
// lastScannedHeight = targetScanHeight
scannedNewBlocks = previousScannedHeight != lastScannedHeight
if scannedNewBlocks {

View File

@ -8,6 +8,20 @@
import Foundation
struct ScanRange {
enum Priority: UInt8 {
case unknown = 0
case scanned = 10
case historic = 20
case openAdjacent = 30
case foundNote = 40
case chainTip = 50
case verify = 60
init(_ value: UInt8) {
self = Priority(rawValue: value) ?? .unknown
}
}
let range: Range<BlockHeight>
let priority: UInt8
let priority: Priority
}

View File

@ -50,7 +50,7 @@ actor LatestBlocksDataProviderImpl: LatestBlocksDataProvider {
latestScannedTime = TimeInterval(time)
}
}
func updateBlockData() async {
if let newLatestBlockHeight = try? await service.latestBlockHeight(),
latestBlockHeight < newLatestBlockHeight {

View File

@ -585,7 +585,7 @@ actor ZcashRustBackend: ZcashRustBackendWelding {
BlockHeight(scanRange.start),
BlockHeight(scanRange.end)
)),
priority: scanRange.priority
priority: ScanRange.Priority(scanRange.priority)
)
)
}

View File

@ -81,8 +81,8 @@ final class ComputeSyncControlDataActionTests: ZcashTestCase {
let nextState = await nextContext.state
XCTAssertTrue(
nextState == .fetchUTXO,
"nextContext after .computeSyncControlData is expected to be .fetchUTXO but received \(nextState)"
nextState == .download,
"nextContext after .computeSyncControlData is expected to be .download but received \(nextState)"
)
} catch {
XCTFail("testComputeSyncControlDataAction_checksBeforeSyncCase is not expected to fail. \(error)")

View File

@ -28,8 +28,8 @@ final class SaplingParamsActionTests: ZcashTestCase {
XCTAssertTrue(saplingParametersHandlerMock.handleIfNeededCalled, "saplingParametersHandler.handleIfNeeded() is expected to be called.")
let nextState = await nextContext.state
XCTAssertTrue(
nextState == .download,
"nextContext after .handleSaplingParams is expected to be .download but received \(nextState)"
nextState == .updateSubtreeRoots,
"nextContext after .handleSaplingParams is expected to be .updateSubtreeRoots but received \(nextState)"
)
} catch {
XCTFail("testSaplingParamsAction_NextAction is not expected to fail. \(error)")

View File

@ -31,8 +31,8 @@ final class ValidateServerActionTests: ZcashTestCase {
let nextContext = try await validateServerAction.run(with: .init(state: .validateServer)) { _ in }
let nextState = await nextContext.state
XCTAssertTrue(
nextState == .updateSubtreeRoots,
"nextContext after .validateServer is expected to be .updateSubtreeRoots but received \(nextState)"
nextState == .fetchUTXO,
"nextContext after .validateServer is expected to be .fetchUTXO but received \(nextState)"
)
} catch {
XCTFail("testValidateServerAction_NextAction is not expected to fail. \(error)")

View File

@ -97,17 +97,17 @@ class BlockDownloaderMock: BlockDownloader {
// MARK: - update
var updateLatestDownloadedBlockHeightCallsCount = 0
var updateLatestDownloadedBlockHeightCalled: Bool {
return updateLatestDownloadedBlockHeightCallsCount > 0
var updateLatestDownloadedBlockHeightForceCallsCount = 0
var updateLatestDownloadedBlockHeightForceCalled: Bool {
return updateLatestDownloadedBlockHeightForceCallsCount > 0
}
var updateLatestDownloadedBlockHeightReceivedLatestDownloadedBlockHeight: BlockHeight?
var updateLatestDownloadedBlockHeightClosure: ((BlockHeight) async -> Void)?
var updateLatestDownloadedBlockHeightForceReceivedArguments: (latestDownloadedBlockHeight: BlockHeight, force: Bool)?
var updateLatestDownloadedBlockHeightForceClosure: ((BlockHeight, Bool) async -> Void)?
func update(latestDownloadedBlockHeight: BlockHeight) async {
updateLatestDownloadedBlockHeightCallsCount += 1
updateLatestDownloadedBlockHeightReceivedLatestDownloadedBlockHeight = latestDownloadedBlockHeight
await updateLatestDownloadedBlockHeightClosure!(latestDownloadedBlockHeight)
func update(latestDownloadedBlockHeight: BlockHeight, force: Bool) async {
updateLatestDownloadedBlockHeightForceCallsCount += 1
updateLatestDownloadedBlockHeightForceReceivedArguments = (latestDownloadedBlockHeight: latestDownloadedBlockHeight, force: force)
await updateLatestDownloadedBlockHeightForceClosure!(latestDownloadedBlockHeight, force)
}
// MARK: - latestDownloadedBlockHeight